... However - duplicate subscriptions is a useful feature to exploit if you are only using Queues. As the load balancing algorithm will attempt to share message load evenly, consumers across a network will equally share the message load only if the flag conduitSubscriptions=false . Here's an example. Suppose you have two brokers, A and B, that are connected to one another via a forwarding bridge. Connected to broker A, you have a consumer that subscribes to a queue called Q.TEST . Connected to broker B, you have two consumers that also subscribe to Q.TEST . All consumers have equal priority. Then you start a producer on broker A that writes 30 messages to Q.TEST . By default, (conduitSubscriptions=true ), 15 messages will be sent to the consumer on broker A and the resulting 15 messages will be sent to the two consumers on broker B. The message load has not been equally spread across all three consumers because, by default, broker A views the two subscriptions on broker B as one. If you had set conduitSubscriptions to " to false ", then each of the three consumers would have been given 10 messages. ... By default a network bridge forwards messages on demand in one direction over a single connection. When dupex duplex=true , the same connection is used for a network bridge in the opposite directions, resulting in a by bi-directional bridge. The network bridge configuration is propagated to the other broker so the duplex bridge is an exact replica or the original. Given two brokers, broker A and broker B, a duplex bridge on A to B is the same as a default bridge on A to B and a default bridge on B to A. Note, if you want to configure more than one duplex network bridge between two brokers, to increase throughput or to partition topics and queues, you must provide unique names for each: eg:
Code Block |
|
<networkConnectors>
<networkConnector name="SYSTEM1" duplex="true" uri="static:(tcp://10.x.x.x:61616)">
<dynamicallyIncludedDestinations>
<topic physicalName="outgoing.System1" />
</dynamicallyIncludedDestinations>
</networkConnector>
<networkConnector name="SYSTEM2" duplex="true" uri="static:(tcp://10.x.x.x:61616)">
<dynamicallyIncludedDestinations>
<topic physicalName="outgoing.System2"/>
</dynamicallyIncludedDestinations>
</networkConnector>
</networkConnectors> |
...
Warning |
Networks do not work as expected (they cannot dynamically respond to new consumers) if the advisorySupport broker property is disabled. A fully statically configured network is the only option if advisorySupport is disabled. Read more about it in the following section. |
Networks of brokers and advisories ...
Code Block |
|
<networkConnector uri="static:(tcp://host)">
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
</networkConnector> |
In earlier versions of ActiveMQ prior to 5.6, the broker would still use the same advisory filter and express interest in all consumers on the remote broker. The actual filtering will be done during message dispatch. This is suboptimal solution in huge networks as it creates a lot of "advisory" traffic and load on the brokers. Starting with version 5.6, the broker will automatically create an appropriate advisory filter and express interest only in dynamically included destinations. For our example it will be "ActiveMQ.Advisory.Consumer.Queue.include.test.foo,ActiveMQ.Advisory.Consumer.Topic.include.test.bar ". This can dramatically improve behavior of the network in complex and high-load environments. So what's to be done in older versions of the broker? Luckily, In older broker versions we can achieve the same thing with a bit slightly more complicated configuration. The actual advisory filter that controls in which consumers we are interested is defined with the destinationFilter connector property. It's Its default value is ">", which is concatenated to the "ActiveMQ.Advisory.Consumer." prefix. So to achieve the same thing, we would need to do the following:
Code Block |
|
<networkConnector uri="static:(tcp://host)" destinationFilter="Queue.include.test.foo,ActiveMQ.Advisory.Consumer.Topic.include.test.bar">
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
</networkConnector> |
Note that first destination doesn't does not have the prefix because it's already implied. It's a bit more complicated to set and maintain, but it will work. And if you're using 5.6 or newer version of the broker just including desired destinations with dynamicallyIncludedDestinations should suffice. This also explains why dynamic networks doesn't do not work if you turn off advisory support on the brokers. The brokers in this case cannot dynamically respond to new consumers. ... The staticBridge parameter is available since version 5.6 and it means that the local broker will not subscribe to any advisory topic topics on the remote broker, meaning it is not interested in whether there are any consumers there. Additionally, you need to add a list of destinations to staticallyIncludedDestinations . This will have the same effect as having an additional consumer on the destinations so messages will be forwarded to the remote broker as well. As there 's is no staticBridge parameter in the earlier versions of ActiveMQ, you can trick the broker by setting destinationFilter to listen to an unused advisory topic, like ... Here is an example of two brokers networked together. The local broker Local Broker contains the network connector configured with a dynamicallyIncludedDestination and the remote broker Remote Broker is configured with a CompositeTopic:
Code Block |
language |
xml |
title |
Local Broker |
|
<networkConnector uri="static:(tcp://host)">
<dynamicallyIncludedDestinations>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
</networkConnector> |
Code Block |
language |
xml |
title |
Remote Broker |
|
<compositeTopic name="include.test.bar" forwardOnly="false">
<forwardTo>
<queue physicalName="include.test.bar.forward" />
</forwardTo>
</compositeTopic > |
In this example, let's consider a single consumer on the remote broker Remote Broker on the queue include. test.bar.forward . If a message is sent directly to the remote broker Remote Broker on the topic include.test.bar , it will it will be forwarded to the queue include.test.bar.forward and forward and the consumer will receive it. However, if a message is published to the same topic on the local brokerLocal Broker, this message will not be forwarded to the remote brokerRemote Broker. The message is not forwarded because a consumer on the queue include.test.bar.forward would not be detected as part of the dynamicallyIncludedDestinations list so messages in the Local Broker. Messages would not be forwarded to the remote broker Remote Broker unless there was a consumer on the original topic as well , (in this case include. test.bar ). This can be fixed by configuring the broker Local Broker to listen for Virtual Destination Subscriptions. First, we need to configure the broker Remote Broker to send advisory messages when consumers come online onto subscribe to a destination that matches a Virtual Destination. In this case, internally a match is determined by through the use of a Destination Filter will determines if a a that determines whether one destination forwards to another destination. The configuration on the remote broker should be updated to To enable this, set the property useVirtualDestSubs on the Remote Broker to true to enable this.:
Code Block |
language |
xml |
title |
Remote Broker |
|
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://activemq.org/config/1.0">
<broker name="broker1remoteBroker" useVirtualDestSubs="true">
.....
</broker>
</beans>
|
SecondNext, we need to configure the network connector on the Local Broker needs to be configured to listen for the new advisory messages by setting the useVirtualDestSubs property to true :
Code Block |
language |
xml |
title |
Local Broker |
|
<networkConnector uri="static:(tcp://host)" useVirtualDestSubs="true">
<dynamicallyIncludedDestinations>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
</networkConnector> |
Now, if a consumer comes online for subscribes to the queue include.test.bar.forward on the remote brokerRemote Broker, the local broker Local Broker will know to forward messages that are sent to the topic include.testbar.bar Virtual Destination ... Consumers on ... Destination Creation Now let's consider the use case above where there is the same composite topic but no consumers on the queue.
Code Block |
language |
xml |
title |
Remote Broker |
|
<compositeTopic name="include.test.bar" forwardOnly="false">
<forwardTo>
<queue physicalName="include.test.bar.forward" />
</forwardTo>
</compositeTopic > |
There is a composite Composite Topic configured on the remote broker Remote Broker, and the local broker Local Broker is networked to it. Even Even though we 've have enabled useVirtualDestSubs , messages wouldn't will not be forwarded to the remote broker Remote Broker unless a consumer comes online subscribes to the forwarded queue. This would cause messages to Without a consumer, messages would be dropped since as they are sent to a topic . Sometimes on the Local Broker (include.bar ). In this situation it is desirable to have these messages forwarded based on the existence of the a virtual destination . This only applies in the Queue case. For topics, messages should only be forwarded if there is a consumer or durable subscription.that forwards to:
- one or more queues; or
- topics that have durable subscriptions.
Both of these conditions are considered as emitting demand for messages to the Local Broker, despite there being no active consumers on those destinations.
Code Block |
language |
xml |
title |
Local Remote Broker |
|
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://activemq.org/config/1.0">
<broker name="broker1remoteBroker" useVirtualDestSubs="true" useVirtualDestSubsOnCreation="true">
.....
</broker>
</beans> |
With this configuration, when the Queue includequeue include. test.bar.forward is created, a Virtual Destination consumer advisory will be sent to the local broker Local Broker so that it knows to forward messages based on the existence of the CompositeTopic and Queue existingComposite Topic that forwards to a queue. ... Composite Destination consumers and Virtual Topics The above examples show how to configure a Composite destination Destination but a Virtual Topic will also work. In the example below, a consumer on a Queue queue for a Virtual Topic on the Remote Broker will now cause demand and messages will be sent across a network from the Local Broker.
Code Block |
language |
xml |
title |
Local Broker |
|
<networkConnector uri="static:(tcp://host)">
<dynamicallyIncludedDestinations>
<topic physicalName="VirtualTopic.>"/>
</dynamicallyIncludedDestinations>
</networkConnector> |
Code Block |
language |
xml |
title |
Remote Broker |
|
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://activemq.org/config/1.0">
<broker name="broker1remoteBroker" useVirtualDestSubs="true" >
.....
</broker>
</beans> |
... It is possible to have more than one network connector between two brokers. Each network connector uses one underlying transport connection, so you may wish to do this to increase throughput, or have a more flexible configuration. For example, if using distributed queues, you may wish to have equivalent weighting to queue receivers across the network, but only when the receivers are active - e.g. ... N.B. You can only use wildcards in the excludedDestinations and dynamicallyIncludedDestinations properties. N.B. Do not change the name of the bridge or the name of the Broker if you are using durable topic subscribers across the network. Internally ActiceMQ ActiveMQ uses the network name and broker name to build a unique but repeatable durable subscriber name for the network. ... By default, it is not permissible for a message to be replayed back to the broker from which it came. This ensures that messages do not loop when duplex or by directional network connectors are configured. Occasionally it is desirable to allow replay for queues. Consider a scenario where a bidirectional bridge exists between a broker pair. Producers and Consumers get to randomly choose a broker using the failover transport. If one broker is restarted for maintenance, messages accumulated on that broker, that crossed the network bridge, will not be available to consumers till they reconnect to the broker. One solution to this problem is to force a client reconnect using rebalanceClusterClients. Another, is to allow replay of messages back to the origin as there is no local consumer on that broker. There is a destination policy that allows this behavior for queues by configuring a conditionalNetworkBridgeFilterFactory with replayWhenNoConsumers=true . The conditionalNetworkBridgeFilterFactory provides an optional replayDelay based on the broker-in time.
Code Block |
|
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="TEST.>" enableAudit="false">
<networkBridgeFilterFactory>
<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/>
</networkBridgeFilterFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy> |
N.B.: When using replayWhenNoConsumers=true for versions < 5.9, it is necessary to also disable the cursors duplicate detection using enableAudit=false as the cursor could mark the replayed messages as duplicates (depending on the time window between playing and replaying these messages over the network bridge). The problem is fully explained in this blog post. Throttling a network consumer The conditionalNetworkBridgeFilterFactory factory allows a rate limit to be specified for a destination, such that network consumer can be throttled. Prefetch for a network consumer is largely negated by the fact that a network consumer relays a message typically acks very quickly so even with a low prefetch and decreased priority a network consumer can starve a modestly quick local consumer. Throttling provides a remedy for this. |