Author: rajdavies
Date: Wed May 23 10:25:34 2012
New Revision: 1341820
URL: http://svn.apache.org/viewvc?rev=1341820&view=rev
Log:
Additional fixes for https://issues.apache.org/jira/browse/AMQ-3855 - timing
issue in adding wildcard subscriptions can result in duplicate messages sent to
MQTT
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1341820&r1=1341819&r2=1341820&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Wed May 23 10:25:34 2012
@@ -109,9 +109,6 @@ public class Topic extends BaseDestinati
}
public void addSubscription(ConnectionContext context, final Subscription
sub) throws Exception {
-
- super.addSubscription(context, sub);
-
if (!sub.getConsumerInfo().isDurable()) {
// Do a retroactive recovery if needed.
@@ -121,23 +118,34 @@ public class Topic extends BaseDestinati
// while we are recovering a subscription to avoid out of
order messages.
dispatchLock.writeLock().lock();
try {
+ boolean applyRecovery = false;
synchronized (consumers) {
- sub.add(context, this);
- consumers.add(sub);
+ if (!consumers.contains(sub)){
+ sub.add(context, this);
+ consumers.add(sub);
+ applyRecovery=true;
+ super.addSubscription(context, sub);
+ }
+ }
+ if (applyRecovery){
+ subscriptionRecoveryPolicy.recover(context, this, sub);
}
- subscriptionRecoveryPolicy.recover(context, this, sub);
} finally {
dispatchLock.writeLock().unlock();
}
} else {
synchronized (consumers) {
- sub.add(context, this);
- consumers.add(sub);
+ if (!consumers.contains(sub)){
+ sub.add(context, this);
+ consumers.add(sub);
+ super.addSubscription(context, sub);
+ }
}
}
} else {
DurableTopicSubscription dsub = (DurableTopicSubscription) sub;
+ super.addSubscription(context, sub);
sub.add(context, this);
if(dsub.isActive()) {
synchronized (consumers) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1341820&r1=1341819&r2=1341820&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Wed May 23 10:25:34 2012
@@ -255,7 +255,7 @@ public class MQTTTest {
javax.jms.Topic jmsTopic = s.createTopic("foo.far");
MessageProducer producer = s.createProducer(jmsTopic);
- Topic[] topics = {new Topic(utf8("foo/far"), QoS.AT_MOST_ONCE)};
+ Topic[] topics = {new Topic(utf8("foo/+"), QoS.AT_MOST_ONCE)};
connection.subscribe(topics);
for (int i = 0; i < numberOfMessages; i++) {
String payload = "This is Test Message: " + i;