You can force durable subscriptions by setting a flag. Looks like this may not have made it into the documentation.
<networkConnectors> <networkConnector name="edf-to-thirdpartybroker-network-connector" useVirtualDestSubs="true" duplex="true" uri="static:(tcp:// 127.0.0.1:61616)" userName="omitted" password="omitted" > <staticallyIncludedDestinations> <topic physicalName="include.test.bar?forceDurable=true"/> </staticallyIncludedDestinations> </networkConnector> </networkConnectors> On Mon, Nov 11, 2019 at 8:33 AM Tim Bain <tb...@alumni.duke.edu> wrote: > OK, thanks for sharing what you did. > > Tim > > On Mon, Nov 11, 2019, 2:05 AM Cannock, Richard < > richard.cann...@edfenergy.com> wrote: > > > HI Tim > > > > Thanks for the advice. In the end, I got the network connector 'promoted' > > to a durable topic subscriber by adding a temporary Topic to the forward > > declaration like this: > > > > <destinationInterceptors> > > <virtualDestinationInterceptor> > > <virtualDestinations> > > <compositeTopic forwardOnly="true" name="myTopic"> > > <forwardTo> > > <queue physicalName="myqyeue"/> > > <topic physicalName="VirtualTopic.myTopic"/> > > </forwardTo> > > </compositeTopic> > > </virtualDestinations> > > </virtualDestinationInterceptor> > > </destinationInterceptors> > > > > Before attaching a durable scubscriber to this topic. Once restarted the > > broker made the network connector durable. > > > > I then removed the <topic physicalName="VirtualTopic.myTopic"/> and > > restarted the broker again. The network connector remained durable and > > messages remained to be sent to the queue even when the broker was > offline. > > > > This is clearly a workaround, so a bit nervous of it's reliability, so > > clearly an enhancement request to make this a duranble subscription might > > be best. > > > > Thanks > > > > Richard > > > > -----Original Message----- > > From: Tim Bain <tb...@alumni.duke.edu> > > Sent: 08 November 2019 07:00 > > To: ActiveMQ Users <users@activemq.apache.org> > > Subject: Re: compositeTopic and lost messages - broker restart > > > > Durable topic subscriptions are dangerous things, because if the consumer > > disappears without removing the subscription, the broker will keep the > > messages for it forever, eventually running the broker out of memory. A > > durable topic subscription between brokers (for implementing a composite > > destination) is doubly so, since the subscription isn't held by a > > traditional consumer who might give assurances of cleaning things up > before > > tearing itself down. So it's no surprise to me that a non-durable > > subscription is the default behavior, since that's the safer behavior. > > > > However, I think maybe your real question isn't why this isn't the > > default, but why there isn't an option to specify the use of a durable > > subscription if you so choose. Your use case seems like a valid one for > > that particular feature, so if you submit an enhancement request, perhaps > > it could be added in a future version. > > > > In the meantime, could you make the composite topic in your broker use a > > different name than the remote topic (call it *myCompositeTopic*), and > then > > have an embedded Camel route that uses a durable subscription to consume > > from *myCompositeTopic* and publishes the message to *sourceTopic*? Then > > the network of brokers can ensure that there's a durable subscription on > > the remote broker, and you'll still get the composite destination on your > > local broker. This has the additional benefit of not having *sourceTopic* > > be a composite destination on your local broker while being a > non-composite > > destination on the remote broker, which is a pretty wonky configuration > > anyway. > > > > Tim > > > > On Fri, Oct 18, 2019 at 2:05 AM Cannock, Richard < > > richard.cann...@edfenergy.com> wrote: > > > > > HI, > > > > > > Further investigations indicate that the resulting network connector > > > establishes only a non durable topic subscription to the sourceTopic > > > via the CompositeTopic, which is why we do not get any messages queued > > > up at source and subsequently delivered received post the networked > > > broker restart. > > > > > > However, this does not explain why this is only a non durable topic > > > subscription, as I believe this should be a durable topic subscription > > > by default. > > > > > > I have tried this with various ActiveMQ versions upto 5.15.9 and the > > > behaviour is consistent. > > > > > > Any advice? > > > > > > Thanks > > > > > > Richard > > > > > > -----Original Message----- > > > From: Cannock, Richard <richard.cann...@edfenergy.com> > > > Sent: 16 October 2019 19:36 > > > To: users@activemq.apache.org > > > Subject: RE: compositeTopic and lost messages - broker restart > > > > > > Apologies, mail server issue meant I wasn't sure that the original > > > message had gotten through. Obviously it had! > > > > > > These are the same issue so only looking for advice on one of them, > > > thanks. > > > > > > _______ > > > From: Justin Bertram [jbert...@apache.org] > > > Sent: 16 October 2019 17:09 > > > To: users@activemq.apache.org > > > Subject: Re: compositeTopic and lost messages - broker restart > > > > > > Richard, didn't you just send this same email to the ActiveMQ user > > > list (although with a slightly different subject)? Just want to > > > clarify if they are the same or different issues. > > > > > > > > > Justin > > > > > > On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard < > > > richard.cann...@edfenergy.com> wrote: > > > > > > > Hi, > > > > > > > > > > > > > > > > We are trying to enlist in a network of brokers arrangement with a > > > > third party broker hosted externally to us. We wish to receive > > > > messages from a topic (called *sourceTopic*) and forward to internal > > > > queues on our local broker for load balancing and throughput > > > > purposes using virtual destinations, a compositeTopic forwarding to > > > > two local queues on our local broker. > > > > > > > > > > > > > > > > In order to avoid implementing ingress firewall rules for the third > > > > party onto our estate, we have created a duplex network connector so > > > > that we initiate the outbound tcp connection. We have excluded our > > > > local broker queues from propagating to the other broker using the > > > > <excludedDestinations> configuration element. > > > > > > > > > > > > > > > > Our configuration below works fine, as long as our local broker is > > > > up, and we have an active consumer. In this case, messages published > > > > to the third party broker *sourceTopic* are successfully received on > > > > our composite topic forwarded queues, test1 and test2. To avoid any > > > > confusion, the source message producer publishing to *sourceTopic > > > > *is explicitly using PersistentDelivery (even though I believe that > > > > is the > > > default anyway). > > > > > > > > > > > > > > > > If we restart our broker, any messages that were published to the > > > > *sourceTopic* in the intervening period that our broker was down, > > > > are not received onto local forwarded queues test and test2. > > > > > > > > > > > > > > > > It would appear that the network connector configuration and\or > > > > compositeTopic do not result in a durable topic subscription to the > > > > *sourceTopic* on the other broker. > > > > > > > > > > > > > > > > I can also successfully recreate this behaviour with two local > > > > instances of Active MQ on my development machine running side by > > > > side with broker B duplex connected to broker A, so it doesn't > > > > appear to be an issue with the third party broker either. > > > > > > > > > > > > > > > > Our configuration (sensitive data removed) is as below: > > > > > > > > > > > > > > > > Any ideas on how we can resolve this? > > > > > > > > > > > > > > > > Many thanks > > > > > > > > > > > > Richard > > > > > > > > > > > > > > > > <beans > > > > > > > > xmlns="http://www.springframework.org/schema/beans" > > > > > > > > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" > > > > > > > > xsi:schemaLocation="http://www.springframework.org/schema/beans > > > > http://www.springframework.org/schema/beans/spring-beans.xsd > > > > > > > > http://activemq.apache.org/schema/core > > > > http://activemq.apache.org/schema/core/activemq-core.xsd"> > > > > > > > > > > > > > > > > <!-- Allows us to use system properties as variables in this > > > > configuration file --> > > > > > > > > <bean > > > > class="org.springframework.beans.factory.config.PropertyPlaceholderC > > > > on > > > > figurer"> > > > > > > > > <property name="locations"> > > > > > > > > > > > > <value>file:${activemq.conf}/credentials.properties</value> > > > > > > > > </property> > > > > > > > > </bean> > > > > > > > > > > > > > > > > <!-- Allows accessing the server log --> > > > > > > > > <bean id="logQuery" > > > class="io.fabric8.insight.log.log4j.Log4jLogQuery" > > > > > > > > lazy-init="false" scope="singleton" > > > > > > > > init-method="start" destroy-method="stop"> > > > > > > > > </bean> > > > > > > > > > > > > > > > > <!-- > > > > > > > > The <broker> element is used to configure the ActiveMQ > broker. > > > > > > > > --> > > > > > > > > <broker xmlns="http://activemq.apache.org/schema/core" > > > > brokerName="localhost" dataDirectory="${activemq.data}" > > > > useVirtualDestSubs="true"> > > > > > > > > > > > > > > > > <destinationPolicy> > > > > > > > > <policyMap> > > > > > > > > <policyEntries> > > > > > > > > <policyEntry topic=">" > > > > > > > > > <!-- The constantPendingMessageLimitStrategy is > > > > used to prevent > > > > > > > > slow topic consumers to block producers and > > > > affect other consumers > > > > > > > > by limiting the number of messages that are > > > > retained > > > > > > > > For more information, see: > > > > > > > > > > > > > > > > > > > > http://activemq.apache.org/slow-consumer-handling.html > > > > > > > > > > > > > > > > --> > > > > > > > > <pendingMessageLimitStrategy> > > > > > > > > <constantPendingMessageLimitStrategy > > > > limit="1000"/> > > > > > > > > </pendingMessageLimitStrategy> > > > > > > > > </policyEntry> > > > > > > > > > > > > <policyEntry queue="test" enableAudit="false"> > > > > > > > > > > > > <networkBridgeFilterFactory> > > > > > > > > > > > > <conditionalNetworkBridgeFilterFactory > > > > replayWhenNoConsumers="true"/> > > > > > > > > > > > > </networkBridgeFilterFactory> > > > > > > > > > > > > </policyEntry> > > > > > > > > > > > > <policyEntry topic="*sourceTopic*" enableAudit="false"> > > > > > > > > > > > > <networkBridgeFilterFactory> > > > > > > > > > > > > <conditionalNetworkBridgeFilterFactory > > > > replayWhenNoConsumers="true"/> > > > > > > > > > > > > </networkBridgeFilterFactory> > > > > > > > > > > > > </policyEntry> > > > > > > > > </policyEntries> > > > > > > > > </policyMap> > > > > > > > > </destinationPolicy> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > <destinationInterceptors> > > > > > > > > > > > > <virtualDestinationInterceptor> > > > > > > > > > > > > <virtualDestinations> > > > > > > > > > > > > <compositeTopic forwardOnly="true" name="* sourceTopic*"> > > > > > > > > > > > > <forwardTo> > > > > > > > > > > > > <queue physicalName="test" /> > > > > > > > > > > > > > > > > <queue physicalName="test2" /> > > > > > > > > > > > > </forwardTo> > > > > > > > > > > > > </compositeTopic> > > > > > > > > > > > > </virtualDestinations> > > > > > > > > > > > > </virtualDestinationInterceptor> > > > > > > > > </destinationInterceptors> > > > > > > > > > > > > > > > > <!-- > > > > > > > > The managementContext is used to configure how ActiveMQ > > > > is exposed in > > > > > > > > JMX. By default, ActiveMQ uses the MBean server that is > > > > started by > > > > > > > > the JVM. For more information, see: > > > > > > > > > > > > > > > > http://activemq.apache.org/jmx.html > > > > > > > > --> > > > > > > > > <managementContext> > > > > > > > > <managementContext createConnector="false"/> > > > > > > > > </managementContext> > > > > > > > > > > > > > > > > <!-- > > > > > > > > Configure message persistence for the broker. The > > > > default persistence > > > > > > > > mechanism is the KahaDB store (identified by the kahaDB > > tag). > > > > > > > > For more information, see: > > > > > > > > > > > > > > > > http://activemq.apache.org/persistence.html > > > > > > > > --> > > > > > > > > <persistenceAdapter> > > > > > > > > <kahaDB directory="${activemq.data}/kahadb"/> > > > > > > > > </persistenceAdapter> > > > > > > > > > > > > > > > > > > > > > > > > <!-- > > > > > > > > The systemUsage controls the maximum amount of space the > > > > broker will > > > > > > > > use before disabling caching and/or slowing down > producers. > > > > For more information, see: > > > > > > > > http://activemq.apache.org/producer-flow-control.html > > > > > > > > --> > > > > > > > > <systemUsage> > > > > > > > > <systemUsage> > > > > > > > > <memoryUsage> > > > > > > > > <memoryUsage percentOfJvmHeap="70" /> > > > > > > > > </memoryUsage> > > > > > > > > <storeUsage> > > > > > > > > <storeUsage limit="100 gb"/> > > > > > > > > </storeUsage> > > > > > > > > <tempUsage> > > > > > > > > <tempUsage limit="50 gb"/> > > > > > > > > </tempUsage> > > > > > > > > </systemUsage> > > > > > > > > </systemUsage> > > > > > > > > > > > > > > > > <!-- > > > > > > > > The transport connectors expose ActiveMQ over a given > > > > protocol to > > > > > > > > clients and other brokers. For more information, see: > > > > > > > > > > > > > > > > http://activemq.apache.org/configuring-transports.html > > > > > > > > --> > > > > > > > > <transportConnectors> > > > > > > > > <!-- DOS protection, limit concurrent connections to > > > > 1000 and frame size to 100MB --> > > > > > > > > <transportConnector name="openwire" uri="tcp:// > > > > 0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=10 > > > > 48 > > > > 57600 > > > > "/> > > > > > > > > <!-- > > > > > > > > <transportConnector > > > > name="amqp" uri="amqp:// > > > > 0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104 > > > > 85 > > > > 7600 > > > > "/> > > > > > > > > <transportConnector name="stomp" uri="stomp:// > > > > 0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=10 > > > > 48 > > > > 57600 > > > > "/> > > > > > > > > <transportConnector name="mqtt" uri="mqtt:// > > > > 0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104 > > > > 85 > > > > 7600 > > > > "/> > > > > > > > > <transportConnector name="ws" uri="ws:// > > > > 0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=10 > > > > 48 > > > > 57600 > > > > "/> > > > > > > > > --> > > > > > > > > </transportConnectors> > > > > > > > > > > > > > > > > <!-- destroy the spring context on shutdown to stop jetty > > > > --> > > > > > > > > <shutdownHooks> > > > > > > > > <bean xmlns="http://www.springframework.org/schema/beans > " > > > > class="org.apache.activemq.hooks.SpringContextHook" /> > > > > > > > > </shutdownHooks> > > > > > > > > > > > > > > > > <networkConnectors> > > > > > > > > <networkConnector > > > > name="edf-to-thirdpartybroker-network-connector" > > > > > > > > > > useVirtualDestSubs="true" > > > > > > > > duplex="true" > > > > > > > > uri="static:(tcp:// > > > > 127.0.0.1:61616)" userName="omitted" password="omitted" > > > > > > > > > > > > > <staticallyIncludedDestinations> > > > > > > > > > > > > <topic physicalName="*sourceTopic*"/> > > > > > > > > > > > > </staticallyIncludedDestinations> > > > > > > > > > > > > <excludedDestinations> > > > > > > > > > > > > <queue physicalName=">"/> > > > > > > > > > > > > </excludedDestinations> > > > > > > > > > > > > </networkConnector> > > > > > > > > </networkConnectors> > > > > > > > > > > > > > > > > <plugins> > > > > > > > > > > > > <simpleAuthenticationPlugin> > > > > > > > > <users> > > > > > > > > > > > > <authenticationUser username="system" password="omitted" > > > > groups="users,admins"/> > > > > > > > > > > > > <authenticationUser username="queueAUser" password="omitted" > > > > groups="queueAusers"/> > > > > > > > > > > > > <authenticationUser username="queueBUser" password="omitted" > > > > groups="queueBusers"/> > > > > > > > > </users> > > > > > > > > > > > > </simpleAuthenticationPlugin> > > > > > > > > > > > > > > > > > > > > <authorizationPlugin> > > > > > > > > <map> > > > > > > > > <authorizationMap> > > > > > > > > <authorizationEntries> > > > > > > > > <authorizationEntry queue=">" read="admins" > > write="admins" > > > > admin="admins" /> > > > > > > > > <authorizationEntry queue="test" read="queueAusers" > > > > write="queueAusers" admin="queueAUsers" /> > > > > > > > > > > > > <authorizationEntry queue="test2" read="queueBusers" > > write="queueBusers" > > > > admin="queueBusers" /> > > > > > > > > > > > > > > > > <authorizationEntry topic=">" read="admins" > > write="admins" > > > > admin="admins" /> > > > > > > > > > > > > > > > > > > > > > > > > <authorizationEntry topic="ActiveMQ.Advisory.>" > > > > read="queueAusers,queueBusers" write="admins,queueAusers,queueBusers" > > > > admin="admins,queueAusers,queueBusers" /> > > > > > > > > </authorizationEntries> > > > > > > > > > > > > > > > > <!-- let's assign roles to temporary destinations. > > > > comment this entry if we don't want any roles assigned to temp > > > > destinations > > > > --> > > > > > > > > > > > > > > > > </authorizationMap> > > > > > > > > </map> > > > > > > > > </authorizationPlugin> > > > > > > > > > > > > > > > > > > > > > > > > </plugins> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > </broker> > > > > > > > > > > > > > > > > <!-- > > > > > > > > Enable web consoles, REST and Ajax APIs and demos > > > > > > > > The web consoles requires by default login, you can disable > > > > this in the jetty.xml file > > > > > > > > > > > > > > > > Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more > > > > details > > > > > > > > --> > > > > > > > > <import resource="jetty.xml"/> > > > > > > > > > > > > > > > > </beans> > > > > > > > > <!-- END SNIPPET: example --> > > > > > > > > > > > > > > > > *Richard Cannock* > > > > > > > > *Solution Architect* > > > > > > > > Business Change and IT > > > > > > > > Customers > > > > > > > > > > > > > > > > *T:* +44 (0) 2081862465 > > > > > > > > *M:* +44 (0)7729 320505 > > > > > > > > > > > > > > > > Barnwood > > > > > > > > Barnett Way > > > > > > > > Gloucester, Gloucestershire, GL4 3RS > > > > > > > > > > > > > > > > [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24] > > > > > > > > > > > > > > > > edfenergy.com <http://www.edfenergy.com/> > > > > > > > > > > > > > > > > Please consider the environment before printing this email > > > > > > > > > > > > > > > > This e-mail and any files transmitted with it are confidential and > > > > may be protected by legal privilege. If you are not the intended > > > > recipient, please notify the sender and delete the e-mail from your > > > system. > > > > This e-mail has been scanned for malicious content but the internet > > > > is inherently insecure and EDF Energy Limited can not accept any > > > > liability for the integrity of this message or its attachments. > > > > No employee or agent of EDF Energy Limited or any related company is > > > > authorised to conclude any binding agreement on behalf of EDF Energy > > > > Limited or any related company by e-mail. > > > > > > > > All e-mails sent and received by EDF Energy Limited are monitored to > > > > ensure compliance with the company's information security policy. > > > > Executable and script files are not permitted through the EDF Energy > > > > Limited mail gateway. EDF Energy does not accept or send mails > > > > above > > > > 30 Mb in size. > > > > > > > > EDF Energy Limited > > > > Registered in England and Wales No. 2366852 Registered Office: 90 > > > > Whitfield Street, London W1T 4EZ > > > > > > > > > > > > > This e-mail and any files transmitted with it are confidential and may > > > be protected by legal privilege. If you are not the intended > > > recipient, please notify the sender and delete the e-mail from your > > system. > > > This e-mail has been scanned for malicious content but the internet is > > > inherently insecure and EDF Energy Limited cannot accept any liability > > > for the integrity of this message or its attachments. No employee or > > > agent of EDF Energy Limited or any related company is authorised to > > > conclude any binding agreement on behalf of EDF Energy Limited or any > > > related company by e-mail. > > > All e-mails sent and received by EDF Energy Limited are monitored to > > > ensure compliance with the company's information security policy. > > > Executable and script files are not permitted through the EDF Energy > > > Limited mail gateway. EDF Energy does not accept or send mails above > > > 30 Mb in size. > > > > > > EDF Energy Limited Registered in England and Wales No. 2366852 > > > Registered > > > Office: 90 Whitfield Street, London W1T 4EZ > > > > > > > > > This e-mail and any files transmitted with it are confidential and may be > > protected by legal privilege. If you are not the intended recipient, > please > > notify the sender and delete the e-mail from your system. > > This e-mail has been scanned for malicious content but the internet is > > inherently insecure and EDF Energy Limited cannot accept any liability > for > > the integrity of this message or its attachments. No employee or agent of > > EDF Energy Limited or any related company is authorised to conclude any > > binding agreement on behalf of EDF Energy Limited or any related company > by > > e-mail. > > All e-mails sent and received by EDF Energy Limited are monitored to > > ensure compliance with the company's information security policy. > > Executable and script files are not permitted through the EDF Energy > > Limited mail gateway. EDF Energy does not accept or send mails above 30 > Mb > > in size. > > > > EDF Energy Limited Registered in England and Wales No. 2366852 Registered > > Office: 90 Whitfield Street, London W1T 4EZ > > >