http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml b/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml index 5b3221d..363ae96 100644 --- a/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml +++ b/examples/jms/colocated-failover/src/main/resources/activemq/server0/broker.xml @@ -72,7 +72,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>true</forward-when-no-consumers> + <message-load-balancing>STRICT</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml b/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml index 84f38f1..464fdee 100644 --- a/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml +++ b/examples/jms/colocated-failover/src/main/resources/activemq/server1/broker.xml @@ -72,7 +72,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>true</forward-when-no-consumers> + <message-load-balancing>STRICT</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml b/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml index 8c38aea..79dd692 100644 --- a/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml +++ b/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server0/broker.xml @@ -81,7 +81,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>true</forward-when-no-consumers> + <message-load-balancing>STRICT</message-load-balancing> <max-hops>1</max-hops> <static-connectors> <connector-ref>server1-connector</connector-ref> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml b/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml index ca0f811..7b135ac 100644 --- a/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml +++ b/examples/jms/ha-policy-autobackup/src/main/resources/activemq/server1/broker.xml @@ -81,7 +81,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>true</forward-when-no-consumers> + <message-load-balancing>STRICT</message-load-balancing> <max-hops>1</max-hops> <static-connectors> <connector-ref>server0-connector</connector-ref> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml b/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml index e745c71..2d771b0 100644 --- a/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml +++ b/examples/jms/queue-message-redistribution/src/main/resources/activemq/server0/broker.xml @@ -72,7 +72,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>false</forward-when-no-consumers> + <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml b/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml index 7bb693f..870be49 100644 --- a/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml +++ b/examples/jms/queue-message-redistribution/src/main/resources/activemq/server1/broker.xml @@ -73,7 +73,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>false</forward-when-no-consumers> + <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml b/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml index 4a38a74..33de64f 100644 --- a/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml +++ b/examples/jms/scale-down/src/main/resources/activemq/server0/broker.xml @@ -73,7 +73,7 @@ under the License. <retry-interval>500</retry-interval> <reconnect-attempts>5</reconnect-attempts> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>true</forward-when-no-consumers> + <message-load-balancing>STRICT</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml b/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml index 57fa72d..7602ef9 100644 --- a/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml +++ b/examples/jms/scale-down/src/main/resources/activemq/server1/broker.xml @@ -74,7 +74,7 @@ under the License. <retry-interval>500</retry-interval> <reconnect-attempts>5</reconnect-attempts> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>true</forward-when-no-consumers> + <message-load-balancing>STRICT</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/readme.html ---------------------------------------------------------------------- diff --git a/examples/jms/symmetric-cluster/readme.html b/examples/jms/symmetric-cluster/readme.html index 116735c..8be9587 100644 --- a/examples/jms/symmetric-cluster/readme.html +++ b/examples/jms/symmetric-cluster/readme.html @@ -46,7 +46,7 @@ under the License. <p>Using UDP discovery makes configuration simpler since we don't have to know what nodes are available at any one time.</p> <p>Here's the relevant snippet from the server configuration, which tells the server to form a cluster - with the other nodes:</p> + with the other nodes:</p> <pre class="prettyprint"> <code> <cluster-connection name="my-cluster"> @@ -54,19 +54,19 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>true</forward-when-no-consumers> + <message-load-balancing>STRICT</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> </code> - </pre> + </pre> <p>In this example we create a symmetric cluster of six live nodes, and we also pair each live node with it's own backup node. (A backup node is not strictly necessary for a symmetric cluster).</p> <p>In this example will we will demonstrate this by deploying a JMS topic and Queue on all nodes of the cluster , sending messages to the queue and topic from different nodes, and verifying messages are received correctly by consumers on different nodes.</p> <p>For more information on configuring ActiveMQ Artemis clustering in general, please see the clustering - section of the user manual.</p> + section of the user manual.</p> <h2>Example step-by-step</h2> <p><i>To run the example, simply type <code>mvn verify -Pexample</code> from this directory</i></p> @@ -77,12 +77,12 @@ under the License. specific server to do that, and that server might not be available at the time. By creating the connection factory directly we avoid having to worry about a JNDI look-up. In an app server environment you could use HA-JNDI to lookup from the clustered JNDI servers without - having to know about a specific one. + having to know about a specific one. </li> - + <pre class="prettyprint"> <code> - ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithHA("231.7.7.7", 9876); + ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithHA("231.7.7.7", 9876); </code> </pre> @@ -90,8 +90,8 @@ under the License. <pre class="prettyprint"> <code> Queue queue = new ActiveMQQueue("exampleQueue"); - - Topic topic = ActiveMQJMSClient.createActiveMQTopic("exampleTopic"); + + Topic topic = ActiveMQJMSClient.createActiveMQTopic("exampleTopic"); </code> </pre> @@ -165,7 +165,7 @@ under the License. MessageConsumer consumer0 = session0.createConsumer(queue); </code> </pre> - + <li>We create an anonymous message producer on server 2.</li> <pre class="prettyprint"> <code> @@ -177,15 +177,15 @@ under the License. <pre class="prettyprint"> <code> final int numMessages = 500; - + for (int i = 0; i < numMessages; i++) { TextMessage message1 = session2.createTextMessage("Topic message 1"); - + producer2.send(topic, message1); - + TextMessage message2 = session2.createTextMessage("Queue message 1"); - + producer2.send(queue, message2); } </code> @@ -262,7 +262,7 @@ under the License. { connection1.close(); } - + if (connection2 != null) { connection2.close(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml index 312d7a2..27b7489 100644 --- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml +++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server0/broker.xml @@ -76,7 +76,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>false</forward-when-no-consumers> + <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml index f7c2569..16a7931 100644 --- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml +++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server1/broker.xml @@ -74,7 +74,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>false</forward-when-no-consumers> + <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml index b87a19e..30f33a0 100644 --- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml +++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server2/broker.xml @@ -74,7 +74,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>false</forward-when-no-consumers> + <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml index 0374876..afda21b 100644 --- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml +++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server3/broker.xml @@ -74,7 +74,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>false</forward-when-no-consumers> + <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml index 25b099c..321d57b 100644 --- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml +++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server4/broker.xml @@ -73,7 +73,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>false</forward-when-no-consumers> + <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml ---------------------------------------------------------------------- diff --git a/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml b/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml index 8efb4aa..abbe9ad 100644 --- a/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml +++ b/examples/jms/symmetric-cluster/src/main/resources/activemq/server5/broker.xml @@ -73,7 +73,7 @@ under the License. <connector-ref>netty-connector</connector-ref> <retry-interval>500</retry-interval> <use-duplicate-detection>true</use-duplicate-detection> - <forward-when-no-consumers>false</forward-when-no-consumers> + <message-load-balancing>ON_DEMAND</message-load-balancing> <max-hops>1</max-hops> <discovery-group-ref discovery-group-name="my-discovery-group"/> </cluster-connection> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java index 0572255..732c5ed 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ClusteredGroupingTest.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; import org.apache.activemq.artemis.core.server.group.impl.Response; import org.apache.activemq.artemis.core.server.management.Notification; @@ -66,9 +67,9 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(0, isFileStorage(), isNetty()); setupServer(1, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1); - setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -141,11 +142,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -226,11 +227,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -312,13 +313,13 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(2, isFileStorage(), isNetty()); setupServer(3, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2, 3); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2, 3); - setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2, 3); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2, 3); - setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1, 3); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1, 3); - setupClusterConnection("cluster3", "queues", false, 1, 0, 500, isNetty(), 3, 1, 2, 3); + setupClusterConnection("cluster3", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 3, 1, 2, 3); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java index bf98b4b..5358bae 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailoverTest.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.config.ScaleDownConfiguration; import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; @@ -60,9 +61,9 @@ public class ScaleDownFailoverTest extends ClusterTestBase scaleDownConfiguration3.setGroupName("bill"); } staticServers = servers; - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); scaleDownConfiguration.getConnectors().addAll(servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); scaleDownConfiguration2.getConnectors().addAll(servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); scaleDownConfiguration3.getConnectors().addAll(servers[2].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java ---------------------------------------------------------------------- diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java index 46ea3b1..5c64aa5 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/ScaleDownFailureTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.extras.byteman; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.core.config.ScaleDownConfiguration; import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMUnitRunner; @@ -44,8 +45,8 @@ public class ScaleDownFailureTest extends ClusterTestBase ((LiveOnlyPolicyConfiguration) servers[0].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration); ((LiveOnlyPolicyConfiguration) servers[1].getConfiguration().getHAPolicyConfiguration()).setScaleDownConfiguration(scaleDownConfiguration); } - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0); startServers(0, 1); setupSessionFactory(0, isNetty()); setupSessionFactory(1, isNetty()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java index 84541e6..cf352fd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/ClusterControllerTest.java @@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtocolManagerFactory; import org.apache.activemq.artemis.core.server.cluster.ClusterControl; import org.apache.activemq.artemis.core.server.cluster.ClusterController; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase; import org.junit.Before; import org.junit.Test; @@ -48,8 +49,8 @@ public class ClusterControllerTest extends ClusterTestBase getServer(1).getConfiguration().setClusterPassword("something different"); - setupClusterConnection("cluster0", "queues", false, 1, true, 0); - setupClusterConnection("cluster0", "queues", false, 1, true, 1); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 0); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, true, 1); startServers(0); startServers(1); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java index c89ac1b..421a2fb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterHeadersRemovedTest.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.junit.Before; import org.junit.Test; @@ -46,8 +47,8 @@ public class ClusterHeadersRemovedTest extends ClusterTestBase @Test public void testHeadersRemoved() throws Exception { - setupClusterConnection("cluster1", 0, 1, "queues", false, 1, isNetty(), false); - setupClusterConnection("clusterX", 1, -1, "queues", false, 1, isNetty(), false); + setupClusterConnection("cluster1", 0, 1, "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), false); + setupClusterConnection("clusterX", 1, -1, "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), false); startServers(1, 0); setupSessionFactory(0, isNetty()); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 9b4017b..371dee4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -57,6 +57,7 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration; @@ -1913,7 +1914,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase final int nodeFrom, final int nodeTo, final String address, - final boolean forwardWhenNoConsumers, + final MessageLoadBalancingType messageLoadBalancingType, final int maxHops, final boolean netty, final boolean allowDirectConnectionsOnly) @@ -1944,7 +1945,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase .setAddress(address) .setConnectorName(name) .setRetryInterval(100) - .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMessageLoadBalancingType(messageLoadBalancingType) .setMaxHops(maxHops) .setConfirmationWindowSize(1024) .setStaticConnectors(pairs) @@ -1957,7 +1958,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase final int nodeFrom, final int nodeTo, final String address, - final boolean forwardWhenNoConsumers, + final MessageLoadBalancingType messageLoadBalancingType, final int maxHops, final int reconnectAttempts, final long retryInterval, @@ -1991,7 +1992,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase .setConnectorName(name) .setReconnectAttempts(reconnectAttempts) .setRetryInterval(retryInterval) - .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMessageLoadBalancingType(messageLoadBalancingType) .setMaxHops(maxHops) .setConfirmationWindowSize(1024) .setStaticConnectors(pairs) @@ -2002,7 +2003,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase protected void setupClusterConnection(final String name, final String address, - final boolean forwardWhenNoConsumers, + final MessageLoadBalancingType messageLoadBalancingType, final int maxHops, final boolean netty, final int nodeFrom, @@ -2027,7 +2028,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase } Configuration config = serverFrom.getConfiguration(); ClusterConnectionConfiguration clusterConf = - createClusterConfig(name, address, forwardWhenNoConsumers, + createClusterConfig(name, address, messageLoadBalancingType, maxHops, connectorFrom, pairs); @@ -2037,7 +2038,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase protected void setupClusterConnection(final String name, final String address, - final boolean forwardWhenNoConsumers, + final MessageLoadBalancingType messageLoadBalancingType, final int maxHops, final int reconnectAttempts, final long retryInterval, @@ -2072,7 +2073,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase .setReconnectAttempts(reconnectAttempts) .setCallTimeout(100) .setCallFailoverTimeout(100) - .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMessageLoadBalancingType(messageLoadBalancingType) .setMaxHops(maxHops) .setConfirmationWindowSize(1024) .setStaticConnectors(pairs); @@ -2081,7 +2082,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase } private ClusterConnectionConfiguration createClusterConfig(final String name, final String address, - final boolean forwardWhenNoConsumers, final int maxHops, + final MessageLoadBalancingType messageLoadBalancingType, final int maxHops, TransportConfiguration connectorFrom, List<String> pairs) { return new ClusterConnectionConfiguration() @@ -2089,7 +2090,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase .setAddress(address) .setConnectorName(connectorFrom.getName()) .setRetryInterval(250) - .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMessageLoadBalancingType(messageLoadBalancingType) .setMaxHops(maxHops) .setConfirmationWindowSize(1024) .setStaticConnectors(pairs); @@ -2097,7 +2098,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase protected void setupClusterConnectionWithBackups(final String name, final String address, - final boolean forwardWhenNoConsumers, + final MessageLoadBalancingType messageLoadBalancingType, final int maxHops, final boolean netty, final int nodeFrom, @@ -2127,7 +2128,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase .setAddress(address) .setConnectorName(name) .setRetryInterval(250) - .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMessageLoadBalancingType(messageLoadBalancingType) .setMaxHops(maxHops) .setConfirmationWindowSize(1024) .setStaticConnectors(pairs); @@ -2139,7 +2140,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase final int node, final String discoveryGroupName, final String address, - final boolean forwardWhenNoConsumers, + final MessageLoadBalancingType messageLoadBalancingType, final int maxHops, final boolean netty) { @@ -2159,7 +2160,7 @@ public abstract class ClusterTestBase extends ActiveMQTestBase .setConnectorName(name) .setRetryInterval(100) .setDuplicateDetection(true) - .setForwardWhenNoConsumers(forwardWhenNoConsumers) + .setMessageLoadBalancingType(messageLoadBalancingType) .setMaxHops(maxHops) .setConfirmationWindowSize(1024) .setDiscoveryGroupName(discoveryGroupName); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java index 474a953..0e5c2a1 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterWithBackupTest.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.junit.Before; @@ -83,22 +84,22 @@ public class ClusterWithBackupTest extends ClusterTestBase protected void setupCluster() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); } - protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { - setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 4, 5); + setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 3, 4, 5); - setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 3, 5); + setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 4, 3, 5); - setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 5, 3, 4); + setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 5, 3, 4); - setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 4, 5); + setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 4, 5); - setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 3, 5); + setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 3, 5); - setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 3, 4); + setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 3, 4); } protected void setupServers() throws Exception http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java index 9efcae4..d4446a8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.core.postoffice.impl.BindingsImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.UnproposalListener; import org.apache.activemq.artemis.core.server.group.impl.GroupBinding; @@ -59,11 +60,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 2000, 1000, 100); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 2000, 1000, 100); @@ -168,11 +169,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, -1, 2000, 500); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -261,11 +262,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -308,11 +309,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); final int TIMEOUT_GROUPS = 5000; setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, TIMEOUT_GROUPS, -1, -1); @@ -424,11 +425,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 1000, 1000, 100); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 1000, 100, 100); @@ -515,9 +516,9 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, 10000, 500, 750); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 10000, 500, 750); @@ -823,11 +824,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); int TIMEOUT = 50000; setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0, TIMEOUT); @@ -904,11 +905,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -983,11 +984,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1, 0); @@ -1140,11 +1141,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1187,11 +1188,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1238,11 +1239,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1285,11 +1286,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1335,11 +1336,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1381,11 +1382,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1451,11 +1452,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1514,11 +1515,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1577,11 +1578,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, 0, 500, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); @@ -1635,11 +1636,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); @@ -1686,11 +1687,11 @@ public class ClusteredGroupingTest extends ClusterTestBase setupServer(1, isFileStorage(), isNetty()); setupServer(2, isFileStorage(), isNetty()); - setupClusterConnection("cluster0", "queues", false, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", false, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", false, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1); setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java index d6512f4..9dc4123 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredRequestResponseTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.junit.Before; import org.junit.Test; @@ -129,20 +130,20 @@ public class ClusteredRequestResponseTest extends ClusterTestBase protected void setupCluster() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); } - protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { - setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2, 3, 4); + setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2, 3, 4); - setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2, 3, 4); + setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2, 3, 4); - setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1, 3, 4); + setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1, 3, 4); - setupClusterConnection("cluster3", "queues", forwardWhenNoConsumers, 1, isNetty(), 3, 0, 1, 2, 4); + setupClusterConnection("cluster3", "queues", messageLoadBalancingType, 1, isNetty(), 3, 0, 1, 2, 4); - setupClusterConnection("cluster4", "queues", forwardWhenNoConsumers, 1, isNetty(), 4, 0, 1, 2, 3); + setupClusterConnection("cluster4", "queues", messageLoadBalancingType, 1, isNetty(), 4, 0, 1, 2, 3); } protected void setupServers() throws Exception http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java new file mode 100644 index 0000000..3cd9e61 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageLoadBalancingTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.distribution; + +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MessageLoadBalancingTest extends ClusterTestBase +{ + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + + start(); + } + + private void start() throws Exception + { + setupServers(); + + setRedistributionDelay(0); + } + + protected boolean isNetty() + { + return false; + } + + @Test + public void testMessageLoadBalancingOff() throws Exception + { + setupCluster(MessageLoadBalancingType.OFF); + + startServers(0, 1); + + setupSessionFactory(0, isNetty()); + setupSessionFactory(1, isNetty()); + + createQueue(0, "queues.testaddress", "queue0", null, false); + createQueue(1, "queues.testaddress", "queue0", null, false); + + addConsumer(1, 1, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 0, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 1, 1, false); + waitForBindings(1, "queues.testaddress", 1, 0, false); + + addConsumer(0, 0, "queue0", null); + + waitForBindings(0, "queues.testaddress", 1, 1, true); + waitForBindings(1, "queues.testaddress", 1, 1, true); + + waitForBindings(0, "queues.testaddress", 1, 1, false); + waitForBindings(1, "queues.testaddress", 1, 1, false); + + send(0, "queues.testaddress", 10, false, null); + + ClientMessage message = getConsumer(1).receive(1000); + Assert.assertNull(message); + + for (int i = 0; i < 10; i++) + { + message = getConsumer(0).receive(5000); + Assert.assertNotNull("" + i, message); + message.acknowledge(); + } + + ClientMessage clientMessage = getConsumer(0).receiveImmediate(); + Assert.assertNull(clientMessage); + } + + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception + { + setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1); + + setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0); + } + + protected void setRedistributionDelay(final long delay) + { + AddressSettings as = new AddressSettings().setRedistributionDelay(delay); + + getServer(0).getAddressSettingsRepository().addMatch("queues.*", as); + getServer(1).getAddressSettingsRepository().addMatch("queues.*", as); + } + + protected void setupServers() throws Exception + { + setupServer(0, isFileStorage(), isNetty()); + setupServer(1, isFileStorage(), isNetty()); + } + + protected void stopServers() throws Exception + { + closeAllConsumers(); + + closeAllSessionFactories(); + + closeAllServerLocatorsFactories(); + + stopServers(0, 1); + + clearServer(0, 1); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java index ebdac36..d294e75 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java @@ -28,6 +28,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; import org.apache.activemq.artemis.core.message.impl.MessageImpl; import org.apache.activemq.artemis.core.server.Bindable; @@ -69,7 +70,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWithMessageGroups() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); MessageRedistributionTest.log.info("Doing test"); @@ -174,7 +175,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionStopsWhenConsumerAdded() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); MessageRedistributionTest.log.info("Doing test"); @@ -212,7 +213,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWhenConsumerIsClosed() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); MessageRedistributionTest.log.info("Doing test"); @@ -254,7 +255,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWhenConsumerIsClosedDifferentQueues() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -334,7 +335,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWhenConsumerIsClosedNotConsumersOnAllNodes() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -371,7 +372,7 @@ public class MessageRedistributionTest extends ClusterTestBase public void testNoRedistributionWhenConsumerIsClosedForwardWhenNoConsumersTrue() throws Exception { // x - setupCluster(true); + setupCluster(MessageLoadBalancingType.STRICT); startServers(0, 1, 2); @@ -423,7 +424,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testNoRedistributionWhenConsumerIsClosedNoConsumersOnOtherNodes() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -473,7 +474,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributeWithScheduling() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); AddressSettings setting = new AddressSettings().setRedeliveryDelay(10000); servers[0].getAddressSettingsRepository().addMatch("queues.testaddress", setting); @@ -583,7 +584,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWhenConsumerIsClosedQueuesWithFilters() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -624,7 +625,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWhenConsumerIsClosedConsumersWithFilters() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -665,7 +666,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWhenRemoteConsumerIsAdded() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -702,7 +703,7 @@ public class MessageRedistributionTest extends ClusterTestBase { for (int i = 0; i < 10; i++) { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -812,7 +813,7 @@ public class MessageRedistributionTest extends ClusterTestBase } for (int i = 0; i < 10; i++) { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1); @@ -877,7 +878,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionToQueuesWhereNotAllMessagesMatch() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -919,7 +920,7 @@ public class MessageRedistributionTest extends ClusterTestBase final long delay = 1000; setRedistributionDelay(delay); - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -959,7 +960,7 @@ public class MessageRedistributionTest extends ClusterTestBase final long delay = 1000; setRedistributionDelay(delay); - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -999,7 +1000,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionNumberOfMessagesGreaterThanBatchSize() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0, 1, 2); @@ -1037,7 +1038,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWhenNewNodeIsAddedWithConsumer() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); startServers(0); @@ -1069,7 +1070,7 @@ public class MessageRedistributionTest extends ClusterTestBase @Test public void testRedistributionWithPagingOnTarget() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); AddressSettings as = new AddressSettings() .setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE) @@ -1136,13 +1137,13 @@ public class MessageRedistributionTest extends ClusterTestBase session1.close(); } - protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { - setupClusterConnection("cluster0", "queues", forwardWhenNoConsumers, 1, isNetty(), 0, 1, 2); + setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2); - setupClusterConnection("cluster1", "queues", forwardWhenNoConsumers, 1, isNetty(), 1, 0, 2); + setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2); - setupClusterConnection("cluster2", "queues", forwardWhenNoConsumers, 1, isNetty(), 2, 0, 1); + setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1); } protected void setRedistributionDelay(final long delay) http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/20326d0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java index ae6df0c..af15179 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionWithDiscoveryTest.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.activemq.artemis.tests.integration.cluster.distribution; +import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; import org.junit.Before; @@ -53,22 +54,22 @@ public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase protected void setupCluster() throws Exception { - setupCluster(false); + setupCluster(MessageLoadBalancingType.ON_DEMAND); } - protected void setupCluster(final boolean forwardWhenNoConsumers) throws Exception + protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception { for (int i = 0; i < 5; i++) { - setServer(forwardWhenNoConsumers, i); + setServer(messageLoadBalancingType, i); } } /** - * @param forwardWhenNoConsumers + * @param messageLoadBalancingType * @throws Exception */ - protected void setServer(final boolean forwardWhenNoConsumers, int server) throws Exception + protected void setServer(final MessageLoadBalancingType messageLoadBalancingType, int server) throws Exception { setupLiveServerWithDiscovery(server, groupAddress, @@ -83,7 +84,7 @@ public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase servers[server].getAddressSettingsRepository().addMatch("#", setting); - setupDiscoveryClusterConnection("cluster" + server, server, "dg1", "queues", forwardWhenNoConsumers, 1, isNetty()); + setupDiscoveryClusterConnection("cluster" + server, server, "dg1", "queues", messageLoadBalancingType, 1, isNetty()); } @Test @@ -145,7 +146,7 @@ public class MessageRedistributionWithDiscoveryTest extends ClusterTestBase servers[0].stop(); servers[0] = null; - setServer(false, 0); + setServer(MessageLoadBalancingType.ON_DEMAND, 0); startServers(1, 2);