This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 224b898 ARTEMIS-2007 - allow redistribution if there are unmatched
messages pending on a queue and there is new remote demand
224b898 is described below
commit 224b89810d023e3ee73b014a48585da8f6048237
Author: gtully <[email protected]>
AuthorDate: Thu Jun 24 16:24:48 2021 +0100
ARTEMIS-2007 - allow redistribution if there are unmatched messages pending
on a queue and there is new remote demand
---
.../protocol/mqtt/MQTTSubscriptionManager.java | 6 +-
.../artemis/core/server/ActiveMQServerLogger.java | 2 +-
.../apache/activemq/artemis/core/server/Queue.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 118 +++---
.../server/impl/ScheduledDeliveryHandlerTest.java | 2 +-
docs/user-manual/en/clusters.md | 71 +---
.../cluster/SelectorRedistributionClusterTest.java | 406 +++++++++++++++++++++
.../tests/unit/core/postoffice/impl/FakeQueue.java | 2 +-
.../tests/unit/core/server/impl/QueueImplTest.java | 98 +++++
.../unit/core/server/impl/fakes/FakeConsumer.java | 7 +
10 files changed, 593 insertions(+), 121 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index c23f994..b6db148 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -211,8 +211,10 @@ public class MQTTSubscriptionManager {
Set<Consumer> queueConsumers;
if (queue != null && (queueConsumers = (Set<Consumer>)
queue.getConsumers()) != null) {
for (Consumer consumer : queueConsumers) {
- ((ServerConsumer) consumer).close(false);
- consumerQoSLevels.remove(((ServerConsumer) consumer).getID());
+ if (consumer instanceof ServerConsumer) {
+ ((ServerConsumer) consumer).close(false);
+ consumerQoSLevels.remove(((ServerConsumer)
consumer).getID());
+ }
}
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index a056c84..6bae295 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1481,7 +1481,7 @@ public interface ActiveMQServerLogger extends BasicLogger
{
void unableToFlushDeliveries(@Cause Exception e);
@LogMessage(level = Logger.Level.WARN)
- @Message(id = 222237, value = "Unable to flush deliveries", format =
Message.Format.MESSAGE_FORMAT)
+ @Message(id = 222237, value = "Unable to stop redistributor", format =
Message.Format.MESSAGE_FORMAT)
void unableToCancelRedistributor(@Cause Exception e);
@LogMessage(level = Logger.Level.WARN)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index dedf190..aa43170 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -380,7 +380,7 @@ public interface Queue extends Bindable,CriticalComponent {
void addRedistributor(long delay);
- void cancelRedistributor() throws Exception;
+ void cancelRedistributor();
boolean hasMatchingConsumer(Message message);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e0d8126..6909cf6 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -113,7 +113,6 @@ import
org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
-import org.apache.activemq.artemis.utils.collections.SingletonIterator;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
@@ -188,6 +187,8 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private boolean mirrorController;
+ private volatile boolean hasUnMatchedPending = false;
+
// Messages will first enter intermediateMessageReferences
// Before they are added to messageReferences
// This is to avoid locking the queue on the producer
@@ -260,7 +261,8 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
private final SimpleString address;
- private ConsumerHolder<Redistributor> redistributor;
+ // redistributor goes in the consumers list, this signals its presence and
allows for easy comparison/check
+ private volatile ConsumerHolder<Redistributor> redistributor;
private ScheduledFuture<?> redistributorFuture;
@@ -1326,12 +1328,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
getExecutor().execute(new Runnable() {
@Override
public void run() {
- try {
- cancelRedistributor();
- } catch (Exception e) {
- // nothing that could be done anyway.. just logging
- ActiveMQServerLogger.LOGGER.unableToCancelRedistributor(e);
- }
+ cancelRedistributor();
}
});
@@ -1391,7 +1388,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return false;
}
- if (consumers.size() >= consumersBeforeDispatch) {
+ if (getConsumerCount() >= consumersBeforeDispatch) {
if (dispatchingUpdater.compareAndSet(this,
BooleanUtil.toInt(false), BooleanUtil.toInt(true))) {
dispatchStartTimeUpdater.set(this, System.currentTimeMillis());
}
@@ -1419,7 +1416,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
synchronized (this) {
- if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumers.size() >=
maxConsumers) {
+ if (maxConsumers != MAX_CONSUMERS_UNLIMITED && getConsumerCount()
>= maxConsumers) {
throw
ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
}
@@ -1488,7 +1485,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (consumerRemoved) {
consumerRemovedTimestampUpdater.set(this,
System.currentTimeMillis());
- if (consumers.size() == 0) {
+ if (getConsumerCount() == 0) {
stopDispatch();
}
}
@@ -1525,11 +1522,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
supports = false;
}
}
- if (redistributor != null) {
- if (!redistributor.consumer.supportsDirectDelivery()) {
- supports = false;
- }
- }
return supports;
}
@@ -1540,10 +1532,11 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (redistributor != null) {
// Just prompt delivery
deliverAsync();
+ return;
}
if (delay > 0) {
- if (consumers.isEmpty()) {
+ if (consumers.isEmpty() || hasUnMatchedPending) {
DelayedAddRedistributor dar = new
DelayedAddRedistributor(executor);
redistributorFuture = scheduledExecutor.schedule(dar, delay,
TimeUnit.MILLISECONDS);
@@ -1562,17 +1555,34 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
@Override
- public synchronized void cancelRedistributor() throws Exception {
+ public synchronized void cancelRedistributor() {
+ clearRedistributorFuture();
+
if (redistributor != null) {
- redistributor.consumer.stop();
- redistributor = null;
+ try {
+ redistributor.consumer.stop();
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.unableToCancelRedistributor(e);
+ } finally {
+ consumers.remove(redistributor);
+ redistributor = null;
+ }
}
-
- clearRedistributorFuture();
}
@Override
public int getConsumerCount() {
+ // we don't want to count the redistributor, it is an internal transient
entry in the consumer list
+ if (redistributor != null) {
+ synchronized (this) {
+ final int size = consumers.size();
+ if (size > 0 && redistributor != null) {
+ return size - 1;
+ } else {
+ return size;
+ }
+ }
+ }
return consumers.size();
}
@@ -1775,7 +1785,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
public Map<String, List<MessageReference>> getDeliveringMessages() {
final Iterator<ConsumerHolder<? extends Consumer>>
consumerHolderIterator;
synchronized (this) {
- consumerHolderIterator = redistributor == null ? consumers.iterator()
: SingletonIterator.newInstance(redistributor);
+ consumerHolderIterator = consumers.iterator();
}
Map<String, List<MessageReference>> mapReturn = new HashMap<>();
@@ -2767,9 +2777,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
for (ConsumerHolder holder : this.consumers) {
holder.resetIterator();
}
- if (redistributor != null) {
- redistributor.resetIterator();
- }
}
@Override
@@ -2972,6 +2979,10 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
// Either the iterator is empty or the consumer is busy
int noDelivery = 0;
+ // track filters not matching, used to track when all consumers can't
match, redistribution is then an option
+ int numNoMatch = 0;
+ int numAttempts = 0;
+
int handled = 0;
long timeout = System.nanoTime() +
TimeUnit.MILLISECONDS.toNanos(DELIVERY_TIMEOUT);
@@ -2998,9 +3009,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
MessageReference ref;
- // filter evaluation or transformation may cause properties to be
lazyDecoded, we need to reflect that
- int existingMemoryEstimate = 0;
-
Consumer handledconsumer = null;
synchronized (this) {
@@ -3015,14 +3023,10 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
ConsumerHolder<? extends Consumer> holder;
- if (redistributor == null) {
- if (consumers.hasNext()) {
- holder = consumers.next();
- } else {
- break;
- }
+ if (consumers.hasNext()) {
+ holder = consumers.next();
} else {
- holder = redistributor;
+ break;
}
Consumer consumer = holder.consumer;
@@ -3032,6 +3036,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
holder.iter = messageReferences.iterator();
}
+ // LVQ support
ref = nextDelivery();
boolean nextDelivery = false;
if (ref != null) {
@@ -3059,7 +3064,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
logger.trace("Queue " + this.getName() + " is delivering
reference " + ref);
}
- existingMemoryEstimate = ref.getMessageMemoryEstimate();
final SimpleString groupID = extractGroupID(ref);
groupConsumer = getGroupConsumer(groupID);
@@ -3067,15 +3071,18 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
consumer = groupConsumer;
}
+ numAttempts++;
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
- // if a message was delivered, any previous negative attemps
need to be cleared
+ // if a message was delivered, any previous negative
attempts need to be cleared
// this is to avoid breaks on the loop when checking for any
other factors.
noDelivery = 0;
+ numNoMatch = 0;
+ numAttempts = 0;
- if (redistributor == null) {
+ if (consumer != redistributor) {
ref = handleMessageGroup(ref, consumer, groupConsumer,
groupID);
}
@@ -3104,13 +3111,21 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
noDelivery++;
+ numNoMatch = 0;
+ numAttempts = 0;
+ // no consumers.reset() b/c we skip this consumer
} else if (status == HandleStatus.NO_MATCH) {
- // nothing to be done on this case, the iterators will just
jump next
consumers.reset();
+ numNoMatch++;
+ // every attempt resulted in noMatch for number of consumers
means we tried all consumers for a single message
+ if (numNoMatch == numAttempts && numAttempts ==
consumers.size()) {
+ hasUnMatchedPending = true;
+ // one hit of unmatched message is enough, no need to
reset counters
+ }
}
}
- if (redistributor != null || groupConsumer != null) {
+ if (groupConsumer != null) {
if (noDelivery > 0) {
break;
}
@@ -3303,15 +3318,14 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
private void internalAddRedistributor(final ArtemisExecutor executor) {
- // create the redistributor only once if there are no local consumers
- if (consumers.isEmpty() && redistributor == null) {
+ if (redistributor == null && (consumers.isEmpty() ||
hasUnMatchedPending)) {
if (logger.isTraceEnabled()) {
logger.trace("QueueImpl::Adding redistributor on queue " +
this.toString());
}
-
- redistributor = (new ConsumerHolder(new Redistributor(this,
storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE)));
-
+ redistributor = new ConsumerHolder(new Redistributor(this,
storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE));
redistributor.consumer.start();
+ consumers.add(redistributor);
+ hasUnMatchedPending = false;
deliverAsync();
}
@@ -3749,9 +3763,9 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
consumers.reset();
- while (consumers.hasNext() || redistributor != null) {
+ while (consumers.hasNext()) {
- ConsumerHolder<? extends Consumer> holder = redistributor == null
? consumers.next() : redistributor;
+ ConsumerHolder<? extends Consumer> holder = consumers.next();
Consumer consumer = holder.consumer;
final SimpleString groupID = extractGroupID(ref);
@@ -3764,7 +3778,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
- if (redistributor == null) {
+ if (consumer != redistributor) {
reference = handleMessageGroup(ref, consumer, groupConsumer,
groupID);
} else {
reference = ref;
@@ -4527,12 +4541,12 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
logger.debug(getAddress() + ":" + getName() + " has " +
queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is
receiving messages at a rate of " + queueRate + " msgs/second.");
}
-
- if (consumers.size() == 0) {
+ final int consumerCount = getConsumerCount();
+ if (consumerCount == 0) {
logger.debug("There are no consumers, no need to check slow
consumer's rate");
return;
} else {
- float queueThreshold = thresholdInMsgPerSecond * consumers.size();
+ float queueThreshold = thresholdInMsgPerSecond * consumerCount;
if (queueRate < queueThreshold && queueMessages < queueThreshold) {
if (logger.isDebugEnabled()) {
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 068a442..f3b215f 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1475,7 +1475,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void cancelRedistributor() throws Exception {
+ public void cancelRedistributor() {
}
diff --git a/docs/user-manual/en/clusters.md b/docs/user-manual/en/clusters.md
index 3c039c2..d88a7c7 100644
--- a/docs/user-manual/en/clusters.md
+++ b/docs/user-manual/en/clusters.md
@@ -634,9 +634,7 @@ specified. The following shows all the available
configuration options
Keep in mind that this message forwarding/balancing is what we call
"initial distribution." It is different than *redistribution* which
- is [discussed below](#message-redistribution). This distinction is
- important because redistribution is configured differently and has
- unique semantics (e.g. it *does not* support filters (selectors)).
+ is [discussed below](#message-redistribution).
Default is `ON_DEMAND`.
@@ -823,14 +821,14 @@ consumers on the queue the message won't get consumed and
we have a
This is where message redistribution comes in. With message
redistribution Apache ActiveMQ Artemis can be configured to automatically
-*redistribute* messages from queues which have no consumers back to
+*redistribute* messages from queues which have no consumers or consumers
+with filters that don't match messages. The messages are re-routed to
other nodes in the cluster which do have matching consumers. To enable
this functionality `message-load-balancing` must be `ON_DEMAND`.
Message redistribution can be configured to kick in immediately after
-the last consumer on a queue is closed, or to wait a configurable delay
-after the last consumer on a queue is closed before redistributing. By
-default message redistribution is disabled.
+the need to redistribute is detected, or to wait a configurable delay before
redistributing.
+By default, message redistribution is disabled.
Message redistribution can be configured on a per address basis, by
specifying the redistribution delay in the address settings. For more
@@ -855,68 +853,15 @@ The attribute `match` can be an exact match or it can be
a string that
conforms to the Apache ActiveMQ Artemis wildcard syntax (described in
[Wildcard Syntax](wildcard-syntax.md)).
The element `redistribution-delay` defines the delay in milliseconds
-after the last consumer is closed on a queue before redistributing
-messages from that queue to other nodes of the cluster which do have
-matching consumers. A delay of zero means the messages will be
-immediately redistributed. A value of `-1` signifies that messages will
-never be redistributed. The default value is `-1`.
+between detecting the need for redistribution and actually attempting
redistribution.
+A delay of zero means the messages will be immediately redistributed.
+A value of `-1` signifies that messages will never be redistributed. The
default value is `-1`.
It often makes sense to introduce a delay before redistributing as it's
a common case that a consumer closes but another one quickly is created
on the same queue, in such a case you probably don't want to
redistribute immediately since the new consumer will arrive shortly.
-#### Redistribution and filters (selectors)
-
-Although "initial distribution" (described above) does support filters
-(selectors), redistribution does *not* support filters. Consider this
-scenario:
-
- 1. A cluster of 2 nodes - `A` and `B` - using a `redistribution-delay` of
- `0` and a `message-load-balancing` of `ON_DEMAND`.
- 1. `A` and `B` each has the queue `foo`.
- 1. A producer sends a message which is routed to queue `foo` on node `A`.
- The message has property named `myProperty` with a value of `10`.
- 1. A consumer connects to queue `foo` on node `A` with the filter
- `myProperty=5`. This filter doesn't match the message.
- 1. A consumer connects to queue `foo` on node `B` with the filter
- `myProperty=10`. This filter *does* match the message .
-
-Despite the fact that the filter of the consumer on queue `foo` on node `B`
-matches the message, the message will *not* be redistributed from node `A` to
-node `B` because a consumer for the queue exists on node `A`.
-
-Not supporting redistribution based on filters was an explicit design decision
-in order to avoid two main problems - queue scanning and unnecessary
-redistribution.
-
-From a performance perspective a consumer with a filter on a queue is already
-costly due to the scanning that the broker must do on the queue to find
-matching messages. In general, this is a bit of an anti-pattern as it turns
-the broker into something akin to a database where you can "select" the data
-you want using a filter. If brokers are configured in a cluster and a consumer
-with a filter connects and no matches are found after scanning the local queue
-then potentially every instance of that queue in the cluster would need to be
-scanned. This turns into a bit of a scalability nightmare with lots of
consumers
-(especially short-lived consumers) with filters connecting & disconnecting
-frequently. The time & computing resources used for queue scanning would go
-through the roof.
-
-It is also possible to get into a pathological situation where short-lived
-consumers with filters connect to nodes around the cluster and messages get
-redistributed back and forth between nodes without ever actually being
consumed.
-
-One common use-case for consumers with filters (selectors) on queues is
-request/reply using a correlation ID. Following the standard pattern can be
-problematic in a cluster due to the lack of redistribution based on filters
-already described. However, there is a simple way to ensure an application
-using this request/reply pattern gets its reply even when using a correlation
-ID filter in a cluster - create the consumer before the request is sent. This
-will ensure that when the reply is sent it will be routed the proper cluster
-node since "*initial* distribution" (described above) does support filters.
-For example, in the scenario outlined above if steps 3 and 5 were switched
-(i.e. if the consumers were created before the message was sent) then the
-consumer on node `B` would in fact receive the message.
## Cluster topologies
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java
new file mode 100644
index 0000000..38cd430
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/SelectorRedistributionClusterTest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.cluster;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.tests.util.JMSClusteredTestBase;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SelectorRedistributionClusterTest extends JMSClusteredTestBase {
+
+ private final String myQueue = "myQueue";
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ jmsServer1.getActiveMQServer().setIdentity("Server 1");
+ jmsServer2.getActiveMQServer().setIdentity("Server 2");
+ }
+
+ @Override
+ protected boolean enablePersistence() {
+ return true;
+ }
+
+ @Test
+ public void testSelectorRoutingReDistributionOnNoConsumer() throws
Exception {
+
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+ Connection conn1 = cf1.createConnection();
+ Connection conn2 = cf2.createConnection();
+ conn1.start();
+ conn2.start();
+
+ try {
+ Session session1 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ Session session2 = conn2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+ MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage textMessage = session1.createTextMessage("m1");
+ textMessage.setIntProperty("N", 10);
+
+
+ // remote demand with a filter in advance of send, so routing sees
remote filter match and can ignore the local consumer
+ MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
+ // local consumer that does not match message
+ MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
+
+ // verify cluster notifications have completed before send
+ waitForBindings(server1, myQueue, false, 1, 1, 4000);
+
+ prod1.send(textMessage);
+
+ TextMessage received = (TextMessage) cons2.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ // lets check some redistribution back by close with no acknowledge
+ session2.close();
+
+ // consumer on server 1 does not match, redistribution not done yet,
message still available to local consumer
+ session2 = conn2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ cons2 = session2.createConsumer(jmsQueue, "N = 10");
+ received = (TextMessage) cons2.receive(4000);
+ assertNotNull(received);
+
+ // have to create consumer matching filter on server1 in advance such
that redistribution happens fast
+ MessageConsumer cons11 = session1.createConsumer(jmsQueue, "N = 10");
+
+ // now expect redistribution
+ session2.close();
+
+ // get it from server1
+ received = (TextMessage) cons11.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ // done
+ received.acknowledge();
+
+ } finally {
+ conn1.close();
+ conn2.close();
+ }
+ }
+
+ @Test
+ public void
testSelectorRoutingNoReDistributionNewMessageSkipsTillLocalClose() throws
Exception {
+
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+ Connection conn1 = cf1.createConnection();
+ Connection conn2 = cf2.createConnection();
+ conn1.start();
+ conn2.start();
+
+ try {
+ Session session1 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Session session11 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ Session session2 = conn2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+ MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage textMessage = session1.createTextMessage("m1");
+ textMessage.setIntProperty("N", 10);
+
+
+ // remote demand with a filter in advance of send
+ MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
+ // local consumer that does not match message
+ MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
+ // local consumer that matches message
+ MessageConsumer cons11 = session11.createConsumer(jmsQueue, "N = 10");
+
+ // verify cluster notifications have completed before send
+ waitForBindings(server1, myQueue, false, 1, 1, 4000);
+
+ prod1.send(textMessage);
+
+ TextMessage received = (TextMessage) cons11.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ // lets check some redistribution by close with no acknowledge so
session rolls back delivery
+ session11.close();
+
+ // nothing for the existing remote binding
+ received = (TextMessage) cons2.receiveNoWait();
+ assertNull(received);
+
+ // send a second message, it will get routed to the remote binding
+ textMessage = session1.createTextMessage("m2");
+ textMessage.setIntProperty("N", 10);
+
+ prod1.send(textMessage);
+
+ received = (TextMessage) cons2.receive(4000);
+ assertNotNull(received);
+ assertEquals("m2", received.getText());
+ received.acknowledge();
+
+ // release the local consumer such that redistribution kicks in
+ session1.close();
+
+ received = (TextMessage) cons2.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ // done
+ received.acknowledge();
+
+ } finally {
+ conn1.close();
+ conn2.close();
+ }
+ }
+
+
+ @Test
+ public void testSelectorRoutingReDistributionDoesNotBlockLocalConsumer()
throws Exception {
+
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+ Connection conn1 = cf1.createConnection();
+ Connection conn2 = cf2.createConnection();
+ conn1.start();
+ conn2.start();
+
+ try {
+ Session session1 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Session session11 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ Session session2 = conn2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+ MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage textMessage = session1.createTextMessage("m1");
+ textMessage.setIntProperty("N", 10);
+
+
+ // local consumers that does not match message
+ MessageConsumer cons1_0 = session1.createConsumer(jmsQueue, "N = 0");
+ MessageConsumer cons1_1 = session1.createConsumer(jmsQueue, "N = 1");
+
+ // local consumer that matches message
+ MessageConsumer cons1_10 = session11.createConsumer(jmsQueue, "N =
10");
+
+ prod1.send(textMessage);
+
+ TextMessage received = (TextMessage) cons1_10.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ // lets check some redistribution by close with no acknowledge so
session rolls back delivery
+ session11.close();
+
+ // remote demand with a filter, consumer moved triggers
redistribution event b/c all local consumers don't match
+ MessageConsumer cons2_10 = session2.createConsumer(jmsQueue, "N =
10");
+
+ received = (TextMessage) cons2_10.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+ received.acknowledge();
+
+ // check local consumers can still get dispatched
+ textMessage = session1.createTextMessage("m2");
+ textMessage.setIntProperty("N", 0);
+ prod1.send(textMessage);
+
+ textMessage = session1.createTextMessage("m3");
+ textMessage.setIntProperty("N", 1);
+ prod1.send(textMessage);
+
+
+ received = (TextMessage) cons1_0.receive(4000);
+ assertNotNull(received);
+ assertEquals("m2", received.getText());
+ received.acknowledge();
+
+ received = (TextMessage) cons1_1.receive(4000);
+ assertNotNull(received);
+ assertEquals("m3", received.getText());
+ received.acknowledge();
+
+ // verify redistributor still kicks in too
+ textMessage = session1.createTextMessage("m4");
+ textMessage.setIntProperty("N", 10);
+ prod1.send(textMessage);
+
+ received = (TextMessage) cons2_10.receive(4000);
+ assertNotNull(received);
+ assertEquals("m4", received.getText());
+ received.acknowledge();
+
+ } finally {
+ conn1.close();
+ conn2.close();
+ }
+ }
+
+
+ @Test
+ public void testSelectorRoutingReDistributionOnConsumerMove() throws
Exception {
+
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+ Connection conn1 = cf1.createConnection();
+ Connection conn2 = cf2.createConnection();
+ conn1.start();
+ conn2.start();
+
+ try {
+ Session session1 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Session session11 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ Session session2 = conn2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+ MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage textMessage = session1.createTextMessage("m1");
+ textMessage.setIntProperty("N", 10);
+
+
+ // local consumers that does not match message
+ MessageConsumer cons1 = session1.createConsumer(jmsQueue, "N = 0");
+ MessageConsumer cons12 = session1.createConsumer(jmsQueue, "N = 1");
+
+ // local consumer that matches message
+ MessageConsumer cons111 = session11.createConsumer(jmsQueue, "N =
10");
+
+ prod1.send(textMessage);
+
+ TextMessage received = (TextMessage) cons111.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ // lets check some redistribution by close with no acknowledge so
session rolls back delivery
+ session11.close();
+
+ // remote demand with a filter, consumer moved triggers
redistribution event b/c all local consumers don't match
+ MessageConsumer cons2 = session2.createConsumer(jmsQueue, "N = 10");
+
+ received = (TextMessage) cons2.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ received.acknowledge();
+
+ } finally {
+ conn1.close();
+ conn2.close();
+ }
+ }
+
+ @Test
+ public void
testSelectorRoutingReDistributionOnLocalNoMatchConsumerCloseNeedsNewRemoteDemand()
throws Exception {
+
server1.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+
server2.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(true).setAutoCreateAddresses(true).setRedistributionDelay(0);
+ Connection conn1 = cf1.createConnection();
+ Connection conn2 = cf2.createConnection();
+ conn1.start();
+ conn2.start();
+
+ try {
+ Session session1 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Session session1_n_10 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ Session session1_n_1 = conn1.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ Session session2 = conn2.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+
+ javax.jms.Queue jmsQueue = session1.createQueue(myQueue);
+
+
+ MessageProducer prod1 = session1.createProducer(jmsQueue);
+
+ prod1.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ TextMessage textMessage = session1.createTextMessage("m1");
+ textMessage.setIntProperty("N", 10);
+
+
+ // remote demand with a filter
+ MessageConsumer consumer2_n_10 = session2.createConsumer(jmsQueue, "N
= 10");
+
+ // local consumers that does not match message
+ MessageConsumer consumer1_n_0 = session1.createConsumer(jmsQueue, "N
= 0");
+ MessageConsumer consumer1_n_1 = session1_n_1.createConsumer(jmsQueue,
"N = 1");
+
+ // local consumer that matches message
+ MessageConsumer consumer1_n_10 =
session1_n_10.createConsumer(jmsQueue, "N = 10");
+
+ // verify cluster notifications have completed before send
+ waitForBindings(server1, myQueue, false, 1, 1, 4000);
+ waitForBindings(server1, myQueue, true, 1, 3, 4000);
+
+ prod1.send(textMessage);
+
+ TextMessage received = (TextMessage) consumer1_n_10.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ // lets prepare some non matching message for redistribution by close
with no acknowledge so session rolls back delivery
+ session1_n_10.close();
+
+ // verify no redistribution event yet
+ assertNull(consumer2_n_10.receiveNoWait());
+
+ // local remove consumer event will not trigger redistribution
+ session1_n_1.close();
+
+ // verify no redistribution event yet
+ assertNull(consumer2_n_10.receiveNoWait());
+
+ // force a redistribution event on new remote consumer creation (that
won't match in this case), trigger redistribution
+ MessageConsumer consumer2_n_0 = session2.createConsumer(jmsQueue, "N
= 0");
+
+ // verify redistribution to remote
+ received = (TextMessage) consumer2_n_10.receive(4000);
+ assertNotNull(received);
+ assertEquals("m1", received.getText());
+
+ received.acknowledge();
+
+ } finally {
+ conn1.close();
+ conn2.close();
+ }
+ }
+}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 06f78b2..fb2fdc5 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -439,7 +439,7 @@ public class FakeQueue extends CriticalComponentImpl
implements Queue {
}
@Override
- public void cancelRedistributor() throws Exception {
+ public void cancelRedistributor() {
// no-op
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 47555df..0493c8b 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -967,6 +967,104 @@ public class QueueImplTest extends ActiveMQTestBase {
Assert.assertEquals(20, queue.getDeliveringCount());
}
+
+ @Test
+ public void testNoMatchConsumersAllowsRedistribution() throws Exception {
+ QueueImpl queue = getTemporaryQueue();
+
+ final int numMessages = 2;
+ List<MessageReference> refs = new ArrayList<>();
+
+ for (int i = 0; i < numMessages; i++) {
+ MessageReference ref = generateReference(queue, i);
+ ref.getMessage().putStringProperty("color", "red");
+ refs.add(ref);
+
+ queue.addTail(ref);
+ }
+
+ Assert.assertEquals(numMessages, getMessageCount(queue));
+ queue.deliverNow();
+
+ Assert.assertEquals(numMessages, getMessageCount(queue));
+
+ FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color
= 'green'"));
+ queue.addConsumer(consumer);
+
+ FakeConsumer consumer2 = new FakeConsumer(FilterImpl.createFilter("color
= 'orange'"));
+ queue.addConsumer(consumer2);
+
+ queue.deliverNow();
+ Assert.assertEquals(0, consumer.getReferences().size());
+ Assert.assertEquals(0, consumer2.getReferences().size());
+
+ // verify redistributor is doing some work....
+ try {
+ // should attempt to add due to unmatched
+ queue.addRedistributor(0);
+ fail("expect error on attempt to add addRedistributor - npe b/c no
storage etc");
+ } catch (NullPointerException expected) {
+ }
+
+ // verify with odd number as check depends on order/reset/wrap of
consumers
+ FakeConsumer consumer3 = new FakeConsumer(FilterImpl.createFilter("color
= 'blue'"));
+ queue.addConsumer(consumer3);
+
+ queue.deliverNow();
+
+ Assert.assertEquals(0, consumer.getReferences().size());
+ Assert.assertEquals(0, consumer2.getReferences().size());
+ Assert.assertEquals(0, consumer3.getReferences().size());
+
+ // verify redistributor is doing some work....
+ try {
+ // should attempt to add due to unmatched
+ queue.addRedistributor(0);
+ fail("expect error on attempt to add addRedistributor - npe b/c no
storage etc");
+ } catch (NullPointerException expected) {
+ }
+
+ Assert.assertEquals(numMessages, getMessageCount(queue));
+ }
+
+ @Test
+ public void testNoMatchOn3AllowsRedistribution() throws Exception {
+ QueueImpl queue = getTemporaryQueue();
+
+ int i = 0;
+ MessageReference ref = generateReference(queue, i++);
+ ref.getMessage().putStringProperty("color", "red");
+ queue.addTail(ref);
+
+ ref = generateReference(queue, i++);
+ ref.getMessage().putStringProperty("color", "red");
+ queue.addTail(ref);
+
+ ref = generateReference(queue, i++);
+ ref.getMessage().putStringProperty("color", "blue");
+ queue.addTail(ref);
+
+ Assert.assertEquals(3, getMessageCount(queue));
+
+ FakeConsumer consumerRed = new
FakeConsumer(FilterImpl.createFilter("color = 'red'"));
+ queue.addConsumer(consumerRed);
+
+ FakeConsumer consumerOrange = new
FakeConsumer(FilterImpl.createFilter("color = 'orange'"));
+ queue.addConsumer(consumerOrange);
+
+ queue.deliverNow();
+ Assert.assertEquals(2, consumerRed.getReferences().size());
+ Assert.assertEquals(0, consumerOrange.getReferences().size());
+
+ // verify redistributor is doing some work....
+ try {
+ // should attempt to add due to unmatched
+ queue.addRedistributor(0);
+ fail("expect error on attempt to add addRedistributor - npe b/c no
storage etc");
+ } catch (NullPointerException expected) {
+ }
+ }
+
// Private
------------------------------------------------------------------------------
private void testConsumerWithFilters(final boolean direct) throws Exception
{
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 2a5a330..fe31cfa 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -157,4 +157,11 @@ public class FakeConsumer implements Consumer {
return Collections.emptyList();
}
+ @Override
+ public String toString() {
+ if (filter != null) {
+ return filter + ", " + super.toString();
+ }
+ return super.toString();
+ }
}