I put a lot of work into the network bridging in 5.x and it looks like I didn't add documentation for some of it including the forceDurable flag and also the ability to synchronize durable subscriptions on bridge reconnect with a flag. I did document the Virtual destinations demand feature which seems relevant here. I will go back and update the network of brokers page with the new flags/features.
For some tests that show the features see: https://github.com/apache/activemq/blob/activemq-5.15.10/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java https://github.com/apache/activemq/blob/activemq-5.15.10/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java https://github.com/apache/activemq/blob/activemq-5.15.10/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java On Mon, Nov 11, 2019 at 1:17 PM Christopher Shannon < christopher.l.shan...@gmail.com> wrote: > You can force durable subscriptions by setting a flag. Looks like this > may not have made it into the documentation. > > <networkConnectors> > > <networkConnector name="edf-to-thirdpartybroker-network-connector" > useVirtualDestSubs="true" > duplex="true" > > uri="static:(tcp:// > 127.0.0.1:61616)" userName="omitted" password="omitted" > > > <staticallyIncludedDestinations> > > <topic physicalName="include.test.bar?forceDurable=true"/> > > </staticallyIncludedDestinations> > > > </networkConnector> > > </networkConnectors> > > On Mon, Nov 11, 2019 at 8:33 AM Tim Bain <tb...@alumni.duke.edu> wrote: > >> OK, thanks for sharing what you did. >> >> Tim >> >> On Mon, Nov 11, 2019, 2:05 AM Cannock, Richard < >> richard.cann...@edfenergy.com> wrote: >> >> > HI Tim >> > >> > Thanks for the advice. In the end, I got the network connector >> 'promoted' >> > to a durable topic subscriber by adding a temporary Topic to the forward >> > declaration like this: >> > >> > <destinationInterceptors> >> > <virtualDestinationInterceptor> >> > <virtualDestinations> >> > <compositeTopic forwardOnly="true" name="myTopic"> >> > <forwardTo> >> > <queue physicalName="myqyeue"/> >> > <topic physicalName="VirtualTopic.myTopic"/> >> > </forwardTo> >> > </compositeTopic> >> > </virtualDestinations> >> > </virtualDestinationInterceptor> >> > </destinationInterceptors> >> > >> > Before attaching a durable scubscriber to this topic. Once restarted the >> > broker made the network connector durable. >> > >> > I then removed the <topic physicalName="VirtualTopic.myTopic"/> and >> > restarted the broker again. The network connector remained durable and >> > messages remained to be sent to the queue even when the broker was >> offline. >> > >> > This is clearly a workaround, so a bit nervous of it's reliability, so >> > clearly an enhancement request to make this a duranble subscription >> might >> > be best. >> > >> > Thanks >> > >> > Richard >> > >> > -----Original Message----- >> > From: Tim Bain <tb...@alumni.duke.edu> >> > Sent: 08 November 2019 07:00 >> > To: ActiveMQ Users <users@activemq.apache.org> >> > Subject: Re: compositeTopic and lost messages - broker restart >> > >> > Durable topic subscriptions are dangerous things, because if the >> consumer >> > disappears without removing the subscription, the broker will keep the >> > messages for it forever, eventually running the broker out of memory. A >> > durable topic subscription between brokers (for implementing a composite >> > destination) is doubly so, since the subscription isn't held by a >> > traditional consumer who might give assurances of cleaning things up >> before >> > tearing itself down. So it's no surprise to me that a non-durable >> > subscription is the default behavior, since that's the safer behavior. >> > >> > However, I think maybe your real question isn't why this isn't the >> > default, but why there isn't an option to specify the use of a durable >> > subscription if you so choose. Your use case seems like a valid one for >> > that particular feature, so if you submit an enhancement request, >> perhaps >> > it could be added in a future version. >> > >> > In the meantime, could you make the composite topic in your broker use a >> > different name than the remote topic (call it *myCompositeTopic*), and >> then >> > have an embedded Camel route that uses a durable subscription to consume >> > from *myCompositeTopic* and publishes the message to *sourceTopic*? Then >> > the network of brokers can ensure that there's a durable subscription on >> > the remote broker, and you'll still get the composite destination on >> your >> > local broker. This has the additional benefit of not having >> *sourceTopic* >> > be a composite destination on your local broker while being a >> non-composite >> > destination on the remote broker, which is a pretty wonky configuration >> > anyway. >> > >> > Tim >> > >> > On Fri, Oct 18, 2019 at 2:05 AM Cannock, Richard < >> > richard.cann...@edfenergy.com> wrote: >> > >> > > HI, >> > > >> > > Further investigations indicate that the resulting network connector >> > > establishes only a non durable topic subscription to the sourceTopic >> > > via the CompositeTopic, which is why we do not get any messages queued >> > > up at source and subsequently delivered received post the networked >> > > broker restart. >> > > >> > > However, this does not explain why this is only a non durable topic >> > > subscription, as I believe this should be a durable topic subscription >> > > by default. >> > > >> > > I have tried this with various ActiveMQ versions upto 5.15.9 and the >> > > behaviour is consistent. >> > > >> > > Any advice? >> > > >> > > Thanks >> > > >> > > Richard >> > > >> > > -----Original Message----- >> > > From: Cannock, Richard <richard.cann...@edfenergy.com> >> > > Sent: 16 October 2019 19:36 >> > > To: users@activemq.apache.org >> > > Subject: RE: compositeTopic and lost messages - broker restart >> > > >> > > Apologies, mail server issue meant I wasn't sure that the original >> > > message had gotten through. Obviously it had! >> > > >> > > These are the same issue so only looking for advice on one of them, >> > > thanks. >> > > >> > > _______ >> > > From: Justin Bertram [jbert...@apache.org] >> > > Sent: 16 October 2019 17:09 >> > > To: users@activemq.apache.org >> > > Subject: Re: compositeTopic and lost messages - broker restart >> > > >> > > Richard, didn't you just send this same email to the ActiveMQ user >> > > list (although with a slightly different subject)? Just want to >> > > clarify if they are the same or different issues. >> > > >> > > >> > > Justin >> > > >> > > On Wed, Oct 16, 2019 at 10:40 AM Cannock, Richard < >> > > richard.cann...@edfenergy.com> wrote: >> > > >> > > > Hi, >> > > > >> > > > >> > > > >> > > > We are trying to enlist in a network of brokers arrangement with a >> > > > third party broker hosted externally to us. We wish to receive >> > > > messages from a topic (called *sourceTopic*) and forward to internal >> > > > queues on our local broker for load balancing and throughput >> > > > purposes using virtual destinations, a compositeTopic forwarding to >> > > > two local queues on our local broker. >> > > > >> > > > >> > > > >> > > > In order to avoid implementing ingress firewall rules for the third >> > > > party onto our estate, we have created a duplex network connector so >> > > > that we initiate the outbound tcp connection. We have excluded our >> > > > local broker queues from propagating to the other broker using the >> > > > <excludedDestinations> configuration element. >> > > > >> > > > >> > > > >> > > > Our configuration below works fine, as long as our local broker is >> > > > up, and we have an active consumer. In this case, messages published >> > > > to the third party broker *sourceTopic* are successfully received on >> > > > our composite topic forwarded queues, test1 and test2. To avoid any >> > > > confusion, the source message producer publishing to *sourceTopic >> > > > *is explicitly using PersistentDelivery (even though I believe that >> > > > is the >> > > default anyway). >> > > > >> > > > >> > > > >> > > > If we restart our broker, any messages that were published to the >> > > > *sourceTopic* in the intervening period that our broker was down, >> > > > are not received onto local forwarded queues test and test2. >> > > > >> > > > >> > > > >> > > > It would appear that the network connector configuration and\or >> > > > compositeTopic do not result in a durable topic subscription to the >> > > > *sourceTopic* on the other broker. >> > > > >> > > > >> > > > >> > > > I can also successfully recreate this behaviour with two local >> > > > instances of Active MQ on my development machine running side by >> > > > side with broker B duplex connected to broker A, so it doesn't >> > > > appear to be an issue with the third party broker either. >> > > > >> > > > >> > > > >> > > > Our configuration (sensitive data removed) is as below: >> > > > >> > > > >> > > > >> > > > Any ideas on how we can resolve this? >> > > > >> > > > >> > > > >> > > > Many thanks >> > > > >> > > > >> > > > Richard >> > > > >> > > > >> > > > >> > > > <beans >> > > > >> > > > xmlns="http://www.springframework.org/schema/beans" >> > > > >> > > > xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" >> > > > >> > > > xsi:schemaLocation="http://www.springframework.org/schema/beans >> > > > http://www.springframework.org/schema/beans/spring-beans.xsd >> > > > >> > > > http://activemq.apache.org/schema/core >> > > > http://activemq.apache.org/schema/core/activemq-core.xsd"> >> > > > >> > > > >> > > > >> > > > <!-- Allows us to use system properties as variables in this >> > > > configuration file --> >> > > > >> > > > <bean >> > > > class="org.springframework.beans.factory.config.PropertyPlaceholderC >> > > > on >> > > > figurer"> >> > > > >> > > > <property name="locations"> >> > > > >> > > > >> > > > <value>file:${activemq.conf}/credentials.properties</value> >> > > > >> > > > </property> >> > > > >> > > > </bean> >> > > > >> > > > >> > > > >> > > > <!-- Allows accessing the server log --> >> > > > >> > > > <bean id="logQuery" >> > > class="io.fabric8.insight.log.log4j.Log4jLogQuery" >> > > > >> > > > lazy-init="false" scope="singleton" >> > > > >> > > > init-method="start" destroy-method="stop"> >> > > > >> > > > </bean> >> > > > >> > > > >> > > > >> > > > <!-- >> > > > >> > > > The <broker> element is used to configure the ActiveMQ >> broker. >> > > > >> > > > --> >> > > > >> > > > <broker xmlns="http://activemq.apache.org/schema/core" >> > > > brokerName="localhost" dataDirectory="${activemq.data}" >> > > > useVirtualDestSubs="true"> >> > > > >> > > > >> > > > >> > > > <destinationPolicy> >> > > > >> > > > <policyMap> >> > > > >> > > > <policyEntries> >> > > > >> > > > <policyEntry topic=">" > >> > > > >> > > > <!-- The constantPendingMessageLimitStrategy is >> > > > used to prevent >> > > > >> > > > slow topic consumers to block producers and >> > > > affect other consumers >> > > > >> > > > by limiting the number of messages that are >> > > > retained >> > > > >> > > > For more information, see: >> > > > >> > > > >> > > > >> > > > >> > > > http://activemq.apache.org/slow-consumer-handling.html >> > > > >> > > > >> > > > >> > > > --> >> > > > >> > > > <pendingMessageLimitStrategy> >> > > > >> > > > <constantPendingMessageLimitStrategy >> > > > limit="1000"/> >> > > > >> > > > </pendingMessageLimitStrategy> >> > > > >> > > > </policyEntry> >> > > > >> > > > >> > > > <policyEntry queue="test" enableAudit="false"> >> > > > >> > > > >> > > > <networkBridgeFilterFactory> >> > > > >> > > > >> > > > <conditionalNetworkBridgeFilterFactory >> > > > replayWhenNoConsumers="true"/> >> > > > >> > > > >> > > > </networkBridgeFilterFactory> >> > > > >> > > > >> > > > </policyEntry> >> > > > >> > > > >> > > > <policyEntry topic="*sourceTopic*" enableAudit="false"> >> > > > >> > > > >> > > > <networkBridgeFilterFactory> >> > > > >> > > > >> > > > <conditionalNetworkBridgeFilterFactory >> > > > replayWhenNoConsumers="true"/> >> > > > >> > > > >> > > > </networkBridgeFilterFactory> >> > > > >> > > > >> > > > </policyEntry> >> > > > >> > > > </policyEntries> >> > > > >> > > > </policyMap> >> > > > >> > > > </destinationPolicy> >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > <destinationInterceptors> >> > > > >> > > > >> > > > <virtualDestinationInterceptor> >> > > > >> > > > >> > > > <virtualDestinations> >> > > > >> > > > >> > > > <compositeTopic forwardOnly="true" name="* sourceTopic*"> >> > > > >> > > > >> > > > <forwardTo> >> > > > >> > > > >> > > > <queue physicalName="test" /> >> > > > >> > > > >> > > > >> > > > <queue physicalName="test2" /> >> > > > >> > > > >> > > > </forwardTo> >> > > > >> > > > >> > > > </compositeTopic> >> > > > >> > > > >> > > > </virtualDestinations> >> > > > >> > > > >> > > > </virtualDestinationInterceptor> >> > > > >> > > > </destinationInterceptors> >> > > > >> > > > >> > > > >> > > > <!-- >> > > > >> > > > The managementContext is used to configure how ActiveMQ >> > > > is exposed in >> > > > >> > > > JMX. By default, ActiveMQ uses the MBean server that is >> > > > started by >> > > > >> > > > the JVM. For more information, see: >> > > > >> > > > >> > > > >> > > > http://activemq.apache.org/jmx.html >> > > > >> > > > --> >> > > > >> > > > <managementContext> >> > > > >> > > > <managementContext createConnector="false"/> >> > > > >> > > > </managementContext> >> > > > >> > > > >> > > > >> > > > <!-- >> > > > >> > > > Configure message persistence for the broker. The >> > > > default persistence >> > > > >> > > > mechanism is the KahaDB store (identified by the kahaDB >> > tag). >> > > > >> > > > For more information, see: >> > > > >> > > > >> > > > >> > > > http://activemq.apache.org/persistence.html >> > > > >> > > > --> >> > > > >> > > > <persistenceAdapter> >> > > > >> > > > <kahaDB directory="${activemq.data}/kahadb"/> >> > > > >> > > > </persistenceAdapter> >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > <!-- >> > > > >> > > > The systemUsage controls the maximum amount of space the >> > > > broker will >> > > > >> > > > use before disabling caching and/or slowing down >> producers. >> > > > For more information, see: >> > > > >> > > > http://activemq.apache.org/producer-flow-control.html >> > > > >> > > > --> >> > > > >> > > > <systemUsage> >> > > > >> > > > <systemUsage> >> > > > >> > > > <memoryUsage> >> > > > >> > > > <memoryUsage percentOfJvmHeap="70" /> >> > > > >> > > > </memoryUsage> >> > > > >> > > > <storeUsage> >> > > > >> > > > <storeUsage limit="100 gb"/> >> > > > >> > > > </storeUsage> >> > > > >> > > > <tempUsage> >> > > > >> > > > <tempUsage limit="50 gb"/> >> > > > >> > > > </tempUsage> >> > > > >> > > > </systemUsage> >> > > > >> > > > </systemUsage> >> > > > >> > > > >> > > > >> > > > <!-- >> > > > >> > > > The transport connectors expose ActiveMQ over a given >> > > > protocol to >> > > > >> > > > clients and other brokers. For more information, see: >> > > > >> > > > >> > > > >> > > > http://activemq.apache.org/configuring-transports.html >> > > > >> > > > --> >> > > > >> > > > <transportConnectors> >> > > > >> > > > <!-- DOS protection, limit concurrent connections to >> > > > 1000 and frame size to 100MB --> >> > > > >> > > > <transportConnector name="openwire" uri="tcp:// >> > > > >> 0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=10 >> > > > 48 >> > > > 57600 >> > > > "/> >> > > > >> > > > <!-- >> > > > >> > > > <transportConnector >> > > > name="amqp" uri="amqp:// >> > > > >> 0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104 >> > > > 85 >> > > > 7600 >> > > > "/> >> > > > >> > > > <transportConnector name="stomp" uri="stomp:// >> > > > >> 0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=10 >> > > > 48 >> > > > 57600 >> > > > "/> >> > > > >> > > > <transportConnector name="mqtt" uri="mqtt:// >> > > > >> 0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104 >> > > > 85 >> > > > 7600 >> > > > "/> >> > > > >> > > > <transportConnector name="ws" uri="ws:// >> > > > >> 0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=10 >> > > > 48 >> > > > 57600 >> > > > "/> >> > > > >> > > > --> >> > > > >> > > > </transportConnectors> >> > > > >> > > > >> > > > >> > > > <!-- destroy the spring context on shutdown to stop jetty >> > > > --> >> > > > >> > > > <shutdownHooks> >> > > > >> > > > <bean xmlns=" >> http://www.springframework.org/schema/beans" >> > > > class="org.apache.activemq.hooks.SpringContextHook" /> >> > > > >> > > > </shutdownHooks> >> > > > >> > > > >> > > > >> > > > <networkConnectors> >> > > > >> > > > <networkConnector >> > > > name="edf-to-thirdpartybroker-network-connector" >> > > > >> > > > >> > useVirtualDestSubs="true" >> > > > >> > > > duplex="true" >> > > > >> > > > uri="static:(tcp:// >> > > > 127.0.0.1:61616)" userName="omitted" password="omitted" > >> > > > >> > > > >> > > > <staticallyIncludedDestinations> >> > > > >> > > > >> > > > <topic physicalName="*sourceTopic*"/> >> > > > >> > > > >> > > > </staticallyIncludedDestinations> >> > > > >> > > > >> > > > <excludedDestinations> >> > > > >> > > > >> > > > <queue physicalName=">"/> >> > > > >> > > > >> > > > </excludedDestinations> >> > > > >> > > > >> > > > </networkConnector> >> > > > >> > > > </networkConnectors> >> > > > >> > > > >> > > > >> > > > <plugins> >> > > > >> > > > >> > > > <simpleAuthenticationPlugin> >> > > > >> > > > <users> >> > > > >> > > > >> > > > <authenticationUser username="system" password="omitted" >> > > > groups="users,admins"/> >> > > > >> > > > >> > > > <authenticationUser username="queueAUser" password="omitted" >> > > > groups="queueAusers"/> >> > > > >> > > > >> > > > <authenticationUser username="queueBUser" password="omitted" >> > > > groups="queueBusers"/> >> > > > >> > > > </users> >> > > > >> > > > >> > > > </simpleAuthenticationPlugin> >> > > > >> > > > >> > > > >> > > > >> > > > <authorizationPlugin> >> > > > >> > > > <map> >> > > > >> > > > <authorizationMap> >> > > > >> > > > <authorizationEntries> >> > > > >> > > > <authorizationEntry queue=">" read="admins" >> > write="admins" >> > > > admin="admins" /> >> > > > >> > > > <authorizationEntry queue="test" read="queueAusers" >> > > > write="queueAusers" admin="queueAUsers" /> >> > > > >> > > > >> > > > <authorizationEntry queue="test2" read="queueBusers" >> > write="queueBusers" >> > > > admin="queueBusers" /> >> > > > >> > > > >> > > > >> > > > <authorizationEntry topic=">" read="admins" >> > write="admins" >> > > > admin="admins" /> >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > <authorizationEntry topic="ActiveMQ.Advisory.>" >> > > > read="queueAusers,queueBusers" >> write="admins,queueAusers,queueBusers" >> > > > admin="admins,queueAusers,queueBusers" /> >> > > > >> > > > </authorizationEntries> >> > > > >> > > > >> > > > >> > > > <!-- let's assign roles to temporary destinations. >> > > > comment this entry if we don't want any roles assigned to temp >> > > > destinations >> > > > --> >> > > > >> > > > >> > > > >> > > > </authorizationMap> >> > > > >> > > > </map> >> > > > >> > > > </authorizationPlugin> >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > </plugins> >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > </broker> >> > > > >> > > > >> > > > >> > > > <!-- >> > > > >> > > > Enable web consoles, REST and Ajax APIs and demos >> > > > >> > > > The web consoles requires by default login, you can disable >> > > > this in the jetty.xml file >> > > > >> > > > >> > > > >> > > > Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more >> > > > details >> > > > >> > > > --> >> > > > >> > > > <import resource="jetty.xml"/> >> > > > >> > > > >> > > > >> > > > </beans> >> > > > >> > > > <!-- END SNIPPET: example --> >> > > > >> > > > >> > > > >> > > > *Richard Cannock* >> > > > >> > > > *Solution Architect* >> > > > >> > > > Business Change and IT >> > > > >> > > > Customers >> > > > >> > > > >> > > > >> > > > *T:* +44 (0) 2081862465 >> > > > >> > > > *M:* +44 (0)7729 320505 >> > > > >> > > > >> > > > >> > > > Barnwood >> > > > >> > > > Barnett Way >> > > > >> > > > Gloucester, Gloucestershire, GL4 3RS >> > > > >> > > > >> > > > >> > > > [image: cid:4E18CA00-3CA9-4B94-970C-09C6DB028C24] >> > > > >> > > > >> > > > >> > > > edfenergy.com <http://www.edfenergy.com/> >> > > > >> > > > >> > > > >> > > > Please consider the environment before printing this email >> > > > >> > > > >> > > > >> > > > This e-mail and any files transmitted with it are confidential and >> > > > may be protected by legal privilege. If you are not the intended >> > > > recipient, please notify the sender and delete the e-mail from your >> > > system. >> > > > This e-mail has been scanned for malicious content but the internet >> > > > is inherently insecure and EDF Energy Limited can not accept any >> > > > liability for the integrity of this message or its attachments. >> > > > No employee or agent of EDF Energy Limited or any related company is >> > > > authorised to conclude any binding agreement on behalf of EDF Energy >> > > > Limited or any related company by e-mail. >> > > > >> > > > All e-mails sent and received by EDF Energy Limited are monitored to >> > > > ensure compliance with the company's information security policy. >> > > > Executable and script files are not permitted through the EDF Energy >> > > > Limited mail gateway. EDF Energy does not accept or send mails >> > > > above >> > > > 30 Mb in size. >> > > > >> > > > EDF Energy Limited >> > > > Registered in England and Wales No. 2366852 Registered Office: 90 >> > > > Whitfield Street, London W1T 4EZ >> > > > >> > > >> > > >> > > This e-mail and any files transmitted with it are confidential and may >> > > be protected by legal privilege. If you are not the intended >> > > recipient, please notify the sender and delete the e-mail from your >> > system. >> > > This e-mail has been scanned for malicious content but the internet is >> > > inherently insecure and EDF Energy Limited cannot accept any liability >> > > for the integrity of this message or its attachments. No employee or >> > > agent of EDF Energy Limited or any related company is authorised to >> > > conclude any binding agreement on behalf of EDF Energy Limited or any >> > > related company by e-mail. >> > > All e-mails sent and received by EDF Energy Limited are monitored to >> > > ensure compliance with the company's information security policy. >> > > Executable and script files are not permitted through the EDF Energy >> > > Limited mail gateway. EDF Energy does not accept or send mails above >> > > 30 Mb in size. >> > > >> > > EDF Energy Limited Registered in England and Wales No. 2366852 >> > > Registered >> > > Office: 90 Whitfield Street, London W1T 4EZ >> > > >> > >> > >> > This e-mail and any files transmitted with it are confidential and may >> be >> > protected by legal privilege. If you are not the intended recipient, >> please >> > notify the sender and delete the e-mail from your system. >> > This e-mail has been scanned for malicious content but the internet is >> > inherently insecure and EDF Energy Limited cannot accept any liability >> for >> > the integrity of this message or its attachments. No employee or agent >> of >> > EDF Energy Limited or any related company is authorised to conclude any >> > binding agreement on behalf of EDF Energy Limited or any related >> company by >> > e-mail. >> > All e-mails sent and received by EDF Energy Limited are monitored to >> > ensure compliance with the company's information security policy. >> > Executable and script files are not permitted through the EDF Energy >> > Limited mail gateway. EDF Energy does not accept or send mails above 30 >> Mb >> > in size. >> > >> > EDF Energy Limited Registered in England and Wales No. 2366852 >> Registered >> > Office: 90 Whitfield Street, London W1T 4EZ >> > >> >