buildbot failure in ASF Buildbot on activemq-site-production

2014-05-26 Thread buildbot
The Buildbot has detected a new failure on builder activemq-site-production 
while building ASF Buildbot.
Full details are available at:
 http://ci.apache.org/builders/activemq-site-production/builds/826

Buildbot URL: http://ci.apache.org/

Buildslave for this Build: bb-cms-slave

Build Reason: The Nightly scheduler named 'activemq-site-production' triggered 
this build
Build Source Stamp: [branch activemq/activemq-website] HEAD
Blamelist: 

BUILD FAILED: failed compile

sincerely,
 -The Buildbot





Build failed in Jenkins: ActiveMQ-Java7 » ActiveMQ :: Unit Tests #337

2014-05-26 Thread Apache Jenkins Server
See 
https://builds.apache.org/job/ActiveMQ-Java7/org.apache.activemq$activemq-unit-tests/337/changes

Changes:

[claus.ibsen] Added close destroy-method to Postgres data source. Thanks to 
Jakub Korab for reporting in https://github.com/apache/activemq/pull/19

--
[...truncated 525 lines...]
Running org.apache.activemq.config.BrokerPropertiesTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.661 sec - in 
org.apache.activemq.config.BrokerPropertiesTest
Running org.apache.activemq.config.BrokerXmlConfigTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.995 sec - in 
org.apache.activemq.config.BrokerXmlConfigTest
Running org.apache.activemq.config.JDBCConfigTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 7.017 sec - in 
org.apache.activemq.config.JDBCConfigTest
Running org.apache.activemq.console.command.AMQ3410Test
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 16.656 sec - in 
org.apache.activemq.console.command.AMQ3410Test
Running org.apache.activemq.console.command.AMQ3411Test
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 10.478 sec - in 
org.apache.activemq.console.command.AMQ3411Test
Running org.apache.activemq.console.command.PurgeCommandTest
Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 12.175 sec - in 
org.apache.activemq.console.command.PurgeCommandTest
Running org.apache.activemq.conversions.AmqpAndMqttTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.711 sec - in 
org.apache.activemq.conversions.AmqpAndMqttTest
Running org.apache.activemq.filter.DestinationFilterTest
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.069 sec - in 
org.apache.activemq.filter.DestinationFilterTest
Running org.apache.activemq.filter.DestinationMapMemoryTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.1 sec - in 
org.apache.activemq.filter.DestinationMapMemoryTest
Running org.apache.activemq.filter.DestinationMapTempDestinationTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.311 sec - in 
org.apache.activemq.filter.DestinationMapTempDestinationTest
Running org.apache.activemq.filter.DestinationMapTest
Tests run: 16, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.084 sec - in 
org.apache.activemq.filter.DestinationMapTest
Running org.apache.activemq.filter.DestinationPathTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.145 sec - in 
org.apache.activemq.filter.DestinationPathTest
Running org.apache.activemq.filter.DummyPolicyTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.458 sec - in 
org.apache.activemq.filter.DummyPolicyTest
Running org.apache.activemq.jmx.JmxCreateNCTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.246 sec - in 
org.apache.activemq.jmx.JmxCreateNCTest
Running org.apache.activemq.jmx.OpenTypeSupportTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.262 sec - in 
org.apache.activemq.jmx.OpenTypeSupportTest
Running org.apache.activemq.jndi.ActiveMQInitialContextFactoryTest
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.212 sec - in 
org.apache.activemq.jndi.ActiveMQInitialContextFactoryTest
Running org.apache.activemq.jndi.ActiveMQWASInitialContextFactoryTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.197 sec - in 
org.apache.activemq.jndi.ActiveMQWASInitialContextFactoryTest
Running org.apache.activemq.jndi.CustomConnectionFactoryNameTest
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.213 sec - in 
org.apache.activemq.jndi.CustomConnectionFactoryNameTest
Running org.apache.activemq.jndi.DestinationNameWithSlashTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.198 sec - in 
org.apache.activemq.jndi.DestinationNameWithSlashTest
Running org.apache.activemq.jndi.InitialContextTest
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.212 sec - in 
org.apache.activemq.jndi.InitialContextTest
Running org.apache.activemq.jndi.ObjectFactoryTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.197 sec - in 
org.apache.activemq.jndi.ObjectFactoryTest
Running org.apache.activemq.jndi.XAConnectionFactoryTest
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.208 sec - in 
org.apache.activemq.jndi.XAConnectionFactoryTest
Running org.apache.activemq.joramtests.JoramJmsTest
Tests run: 196, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 12.876 sec - 
in org.apache.activemq.joramtests.JoramJmsTest
Running org.apache.activemq.leveldb.LevelDBStoreBrokerTest
Tests run: 129, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 59.882 sec - 
in org.apache.activemq.leveldb.LevelDBStoreBrokerTest
Running org.apache.activemq.management.BoundaryStatisticTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec - in 

Jenkins build became unstable: ActiveMQ-Java7 » ActiveMQ :: AMQP #337

2014-05-26 Thread Apache Jenkins Server
See 
https://builds.apache.org/job/ActiveMQ-Java7/org.apache.activemq$activemq-amqp/337/changes



Jenkins build became unstable: ActiveMQ-Java7 » ActiveMQ :: LevelDB Store #337

2014-05-26 Thread Apache Jenkins Server
See 
https://builds.apache.org/job/ActiveMQ-Java7/org.apache.activemq$activemq-leveldb-store/337/



Build failed in Jenkins: ActiveMQ-Java7 #337

2014-05-26 Thread Apache Jenkins Server
See https://builds.apache.org/job/ActiveMQ-Java7/337/changes

Changes:

[jsherman] Add Jetty host property to resolve URL message

[claus.ibsen] Added close destroy-method to Postgres data source. Thanks to 
Jakub Korab for reporting in https://github.com/apache/activemq/pull/19

[tabish121] Turn down default logging to DEBUG for tests.

[tabish121] Pull out JMS client common test bits into a test support class.

[tabish121] https://issues.apache.org/jira/browse/AMQ-5183

[tabish121] Add an option to allow test cases to easily change the transformer

[claus.ibsen] Upgraded to Camel 2.13.1

[dejan] https://issues.apache.org/jira/browse/AMQ-5186 - remove amqp producers

[tabish121] Add option to turn on an OpenWire TCP endpoint.

[tabish121] Use a fixed buffer to handle incoming deliveries from proton via the

[tabish121] https://issues.apache.org/jira/browse/AMQ-5195

[tabish121] https://issues.apache.org/jira/browse/AMQ-5195

--
[...truncated 3563 lines...]
[INFO] 
[INFO] 
[INFO] 
[INFO] Skipping ActiveMQ :: Web Console
[INFO] This project has been banned from the build due to previous failures.
[INFO] 
[INFO] 
[INFO] 
[INFO] Skipping ActiveMQ :: Karaf Integration Tests
[INFO] This project has been banned from the build due to previous failures.
[INFO] 
[INFO] 
[INFO] 
[INFO] Skipping ActiveMQ :: Integration Test :: Spring 3.1
[INFO] This project has been banned from the build due to previous failures.
[INFO] 
[INFO] 
[INFO] 
[INFO] Skipping ActiveMQ :: Assembly
[INFO] This project has been banned from the build due to previous failures.
[INFO] 
[INFO] 
[INFO] Reactor Summary:
[INFO] 
[INFO] ActiveMQ .. SUCCESS [19.351s]
[INFO] ActiveMQ :: Openwire Generator  SUCCESS [12.782s]
[INFO] ActiveMQ :: Client  SUCCESS [2:02.637s]
[INFO] ActiveMQ :: Openwire Legacy Support ... SUCCESS [15.107s]
[INFO] ActiveMQ :: JAAS .. SUCCESS [56.536s]
[INFO] ActiveMQ :: Broker  SUCCESS [1:35.977s]
[INFO] ActiveMQ :: KahaDB Store .. SUCCESS [6:49.898s]
[INFO] ActiveMQ :: STOMP Protocol  SUCCESS [24:14.999s]
[INFO] ActiveMQ :: MQTT Protocol . SUCCESS [9:35.916s]
[INFO] ActiveMQ :: JDBC Store  SUCCESS [12.853s]
[INFO] ActiveMQ :: LevelDB Store . SUCCESS [20:09.339s]
[INFO] ActiveMQ :: Generic JMS Pool .. SUCCESS [1:32.926s]
[INFO] ActiveMQ :: Pool .. SUCCESS [12.586s]
[INFO] ActiveMQ :: RA  SUCCESS [2:45.345s]
[INFO] ActiveMQ :: Spring  SUCCESS [1:23.399s]
[INFO] ActiveMQ :: AMQP .. SUCCESS [18:56.547s]
[INFO] ActiveMQ :: Console ... SUCCESS [14.531s]
[INFO] ActiveMQ :: Partition Management .. SUCCESS [26.208s]
[INFO] ActiveMQ :: Unit Tests  FAILURE 
[1:43:04.471s]
[INFO] ActiveMQ :: Camel . SKIPPED
[INFO] ActiveMQ :: HTTP Protocol Support . SKIPPED
[INFO] ActiveMQ :: All JAR bundle  SKIPPED
[INFO] ActiveMQ :: File Server ... SKIPPED
[INFO] ActiveMQ :: Log4j Appender  SUCCESS [11.173s]
[INFO] ActiveMQ :: Apache Karaf .. SKIPPED
[INFO] ActiveMQ :: RAR ... SKIPPED
[INFO] ActiveMQ :: Run Jar ... SUCCESS [6.544s]
[INFO] ActiveMQ :: Shiro . SKIPPED
[INFO] ActiveMQ :: Runtime Configuration . SUCCESS [4:30.341s]
[INFO] ActiveMQ :: Tooling ... SUCCESS [5.612s]
[INFO] ActiveMQ :: Memory Usage Test Plugin .. SUCCESS 

buildbot success in ASF Buildbot on activemq-site-production

2014-05-26 Thread buildbot
The Buildbot has detected a restored build on builder activemq-site-production 
while building ASF Buildbot.
Full details are available at:
 http://ci.apache.org/builders/activemq-site-production/builds/827

Buildbot URL: http://ci.apache.org/

Buildslave for this Build: bb-cms-slave

Build Reason: The Nightly scheduler named 'activemq-site-production' triggered 
this build
Build Source Stamp: [branch activemq/activemq-website] HEAD
Blamelist: 

Build succeeded!

sincerely,
 -The Buildbot





buildbot failure in ASF Buildbot on activemq-site-production

2014-05-26 Thread buildbot
The Buildbot has detected a new failure on builder activemq-site-production 
while building ASF Buildbot.
Full details are available at:
 http://ci.apache.org/builders/activemq-site-production/builds/828

Buildbot URL: http://ci.apache.org/

Buildslave for this Build: bb-cms-slave

Build Reason: The Nightly scheduler named 'activemq-site-production' triggered 
this build
Build Source Stamp: [branch activemq/activemq-website] HEAD
Blamelist: 

BUILD FAILED: failed compile

sincerely,
 -The Buildbot





[12/13] git commit: Fixed AMQ-5160, fixed durable subscription retroactive recovery

2014-05-26 Thread dejanb
Fixed AMQ-5160, fixed durable subscription retroactive recovery


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c859676
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c859676
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c859676

Branch: refs/heads/trunk
Commit: 6c859676b3995334e96c16c47653ec72fa70f729
Parents: 42ad103
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Fri May 16 14:21:19 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../broker/region/DurableTopicSubscription.java | 18 ---
 .../broker/region/PrefetchSubscription.java |  3 +-
 .../transport/mqtt/MQTTProtocolConverter.java   | 21 
 .../activemq/transport/mqtt/MQTTTest.java   | 34 ++--
 4 files changed, 55 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index e61a608..4c19c62 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 
@@ -120,9 +119,6 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
 if (active.get() || keepDurableSubsActive) {
 Topic topic = (Topic) destination;
 topic.activate(context, this);
-if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
-topic.recoverRetroactiveMessages(context, this);
-}
 this.enqueueCounter += pending.size();
 } else if (destination.getMessageStore() != null) {
 TopicMessageStore store = (TopicMessageStore) 
destination.getMessageStore();
@@ -172,12 +168,12 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
 pending.setMaxAuditDepth(getMaxAuditDepth());
 pending.setMaxProducersToAudit(getMaxProducersToAudit());
 pending.start();
-// use recovery policy for retroactive topics and consumers
-for (Destination destination : 
durableDestinations.values()) {
-Topic topic = (Topic) destination;
-if (topic.isAlwaysRetroactive() || 
info.isRetroactive()) {
-topic.recoverRetroactiveMessages(context, this);
-}
+}
+// use recovery policy every time sub is activated for 
retroactive topics and consumers
+for (Destination destination : durableDestinations.values()) {
+Topic topic = (Topic) destination;
+if (topic.isAlwaysRetroactive() || info.isRetroactive()) {
+topic.recoverRetroactiveMessages(context, this);
 }
 }
 }
@@ -277,7 +273,7 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
 }
 
 @Override
-protected void dispatchPending() throws IOException {
+public void dispatchPending() throws IOException {
 if (isActive()) {
 super.dispatchPending();
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index ff4c0aa..5ba3b53 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -633,7 +633,8 @@ public abstract class PrefetchSubscription extends 
AbstractSubscription {
 dispatched.removeAll(references);
 }
 
-protected void dispatchPending() throws IOException {
+// made public so it can be used in 

[05/13] git commit: Fixed AMQ-5160, fixed race condition for retained messages

2014-05-26 Thread dejanb
Fixed AMQ-5160, fixed race condition for retained messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/86440903
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/86440903
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/86440903

Branch: refs/heads/trunk
Commit: 8644090377eeef09a5afb2e46594c4fa4c311aae
Parents: c915b19
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Tue May 13 12:16:44 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../transport/mqtt/MQTTProtocolConverter.java   | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/86440903/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
--
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 71a6fcf..88e684e 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -386,6 +386,10 @@ public class MQTTProtocolConverter {
 }
 MQTTSubscription mqttSubscription = new MQTTSubscription(this, 
topicQoS, consumerInfo);
 
+// optimistic add to local maps first to be able to handle commands in 
onActiveMQCommand
+subscriptionsByConsumerId.put(id, mqttSubscription);
+mqttSubscriptionByTopic.put(topicName, mqttSubscription);
+
 final byte[] qos = {-1};
 sendToActiveMQ(consumerInfo, new ResponseHandler() {
 @Override
@@ -401,9 +405,10 @@ public class MQTTProtocolConverter {
 }
 });
 
-if (qos[0] != SUBSCRIBE_ERROR) {
-subscriptionsByConsumerId.put(id, mqttSubscription);
-mqttSubscriptionByTopic.put(topicName, mqttSubscription);
+if (qos[0] == SUBSCRIBE_ERROR) {
+// remove from local maps if subscribe failed
+subscriptionsByConsumerId.remove(id);
+mqttSubscriptionByTopic.remove(topicName);
 }
 
 return qos[0];
@@ -431,7 +436,7 @@ public class MQTTProtocolConverter {
 final Setorg.apache.activemq.broker.region.Destination 
matchingDestinations = topicRegion.getDestinations(destination);
 for (org.apache.activemq.broker.region.Destination dest : 
matchingDestinations) {
 
-// recover retroactive messages for matching subscriptions
+// recover retroactive messages for matching subscription
 for (Subscription subscription : dest.getConsumers()) {
 if 
(subscription.getConsumerInfo().getConsumerId().equals(consumerId)) {
 try {
@@ -440,6 +445,7 @@ public class MQTTProtocolConverter {
 throw new MQTTProtocolException(Error recovering 
retained messages for  +
 dest.getName() + :  + e.getMessage(), false, e);
 }
+break;
 }
 }
 }
@@ -483,7 +489,7 @@ public class MQTTProtocolConverter {
 }
 
 /**
- * Dispatch a ActiveMQ command
+ * Dispatch an ActiveMQ command
  */
 public void onActiveMQCommand(Command command) throws Exception {
 if (command.isResponse()) {



[07/13] git commit: Fixed AMQ-5160, removed redundant MQTTRetainedMessages

2014-05-26 Thread dejanb
Fixed AMQ-5160, removed redundant MQTTRetainedMessages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b36adffe
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b36adffe
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b36adffe

Branch: refs/heads/trunk
Commit: b36adffe71f782d676bcddf41997fec4d1f831d2
Parents: 5576dc5
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Mon May 12 20:25:11 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../transport/mqtt/MQTTRetainedMessages.java| 95 
 1 file changed, 95 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/b36adffe/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
--
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
deleted file mode 100644
index 250366d..000
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the License); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.mqtt;
-
-import org.apache.activemq.Service;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.filter.DestinationMapNode;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ServiceSupport;
-import org.fusesource.hawtbuf.Buffer;
-import org.fusesource.hawtbuf.UTF8Buffer;
-import org.fusesource.mqtt.codec.PUBLISH;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class MQTTRetainedMessages extends ServiceSupport {
-private static final Logger LOG = 
LoggerFactory.getLogger(MQTTRetainedMessages.class);
-private static final Object LOCK = new Object();
-
-DestinationMapNode retainedMessages = new DestinationMapNode(null);
-
-private MQTTRetainedMessages(){
-}
-
-@Override
-protected void doStop(ServiceStopper stopper) throws Exception {
-synchronized (this) {
-retainedMessages = new DestinationMapNode(null);
-}
-}
-
-@Override
-protected void doStart() throws Exception {
-}
-
-   public void addMessage(ActiveMQTopic dest, PUBLISH publish){
-   synchronized (this) {
-   retainedMessages.set(dest.getDestinationPaths(), 0, publish);
-   }
-   }
-
-   public SetPUBLISH getMessages(ActiveMQTopic topic){
-   Set answer = new HashSet();
-   synchronized (this) {
-   retainedMessages.appendMatchingValues(answer, 
topic.getDestinationPaths(), 0);
-   }
-   return (SetPUBLISH)answer;
-   }
-
-public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService 
broker){
-MQTTRetainedMessages result = null;
-if (broker != null){
-synchronized (LOCK){
-   Service[] services = broker.getServices();
-   if (services != null){
-   for (Service service:services){
-   if (service instanceof MQTTRetainedMessages){
-   return (MQTTRetainedMessages) service;
-   }
-   }
-   }
-   result = new MQTTRetainedMessages();
-broker.addService(result);
-if (broker != null  broker.isStarted()){
-try {
-result.start();
-} catch (Exception e) {
-LOG.warn(Couldn't start MQTTRetainedMessages);
-}
-}
-}
-}
-
-
-return result;
-}
-}



[11/13] git commit: Fixed AMQ-5160, restored previous DurableSubscription behaviour of only recovering messages when cursor is empty, retained messages are always recovered

2014-05-26 Thread dejanb
Fixed AMQ-5160, restored previous DurableSubscription behaviour of only 
recovering messages when cursor is empty, retained messages are always recovered


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/42ad1039
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/42ad1039
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/42ad1039

Branch: refs/heads/trunk
Commit: 42ad1039cb51b47a16f46f3d8f0fe8bf36ffdd1d
Parents: 8644090
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Tue May 13 13:15:04 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../activemq/broker/region/DurableTopicSubscription.java  |  5 +
 .../policy/RetainedMessageSubscriptionRecoveryPolicy.java | 10 +-
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index c82e6ef..e61a608 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -137,6 +137,11 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
 dispatchPending();
 }
 
+// used by RetaineMessageSubscriptionRecoveryPolicy
+public boolean isEmpty(Topic topic) {
+return pending.isEmpty(topic);
+}
+
 public void activate(SystemUsage memoryManager, ConnectionContext context, 
ConsumerInfo info, RegionBroker regionBroker) throws Exception {
 if (!active.get()) {
 this.context = context;

http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
index f1573fa..ea07c8b 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -22,6 +22,7 @@ import java.util.List;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.SubscriptionRecovery;
 import org.apache.activemq.broker.region.Topic;
@@ -74,7 +75,14 @@ public class RetainedMessageSubscriptionRecoveryPolicy 
implements SubscriptionRe
 sub.addRecoveredMessage(context, retainedMessage);
 }
 if (wrapped != null) {
-wrapped.recover(context, topic, sub);
+// retain default ActiveMQ behaviour of recovering messages only 
for empty durable subscriptions
+boolean recover = true;
+if (sub instanceof DurableTopicSubscription  
!((DurableTopicSubscription)sub).isEmpty(topic)) {
+recover = false;
+}
+if (recover) {
+wrapped.recover(context, topic, sub);
+}
 }
 }
 



[01/13] git commit: Fixed AMQ-5160, fixed browse() to include messages from wrapped policy

2014-05-26 Thread dejanb
Repository: activemq
Updated Branches:
  refs/heads/trunk ba519d8bd - 6c859676b


Fixed AMQ-5160, fixed browse() to include messages from wrapped policy


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5576dc5d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5576dc5d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5576dc5d

Branch: refs/heads/trunk
Commit: 5576dc5d74e8ccb284dda19d60872a1efb03fcd0
Parents: a581d01
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Mon May 12 19:51:21 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:18 2014 +0200

--
 .../RetainedMessageSubscriptionRecoveryPolicy.java| 14 --
 1 file changed, 12 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/5576dc5d/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
index ba2d1a1..fb0313f 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.broker.region.policy;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.activemq.broker.Broker;
@@ -83,14 +84,23 @@ public class RetainedMessageSubscriptionRecoveryPolicy 
implements SubscriptionRe
 }
 
 public Message[] browse(ActiveMQDestination destination) throws Exception {
-ListMessage result = new ArrayListMessage();
+final ListMessage result = new ArrayListMessage();
 if (retainedMessage != null) {
 DestinationFilter filter = 
DestinationFilter.parseFilter(destination);
 if (filter.matches(retainedMessage.getMessage().getDestination())) 
{
 result.add(retainedMessage.getMessage());
 }
 }
-return result.toArray(new Message[result.size()]);
+Message[] messages = result.toArray(new Message[result.size()]);
+if (wrapped != null) {
+final Message[] wrappedMessages = wrapped.browse(destination);
+if (wrappedMessages != null  wrappedMessages.length  0) {
+final int origLen = messages.length;
+messages = Arrays.copyOf(messages, origLen + 
wrappedMessages.length);
+System.arraycopy(wrappedMessages, 0, messages, origLen, 
wrappedMessages.length);
+}
+}
+return messages;
 }
 
 public SubscriptionRecoveryPolicy copy() {



[03/13] git commit: Partial fix for AMQ-5160, attempts to resolve retained messages using subscription recovery policy, but fails to resend retained messages for duplicate subscriptions

2014-05-26 Thread dejanb
Partial fix for AMQ-5160, attempts to resolve retained messages using 
subscription recovery policy, but fails to resend retained messages for 
duplicate subscriptions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bcb60a48
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bcb60a48
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bcb60a48

Branch: refs/heads/trunk
Commit: bcb60a482cdc1cd2d2de9b4dba6a38ef831b2fa4
Parents: ba519d8
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Wed May 7 19:05:36 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:18 2014 +0200

--
 .../apache/activemq/broker/region/Topic.java|  14 ++-
 ...tainedMessageSubscriptionRecoveryPolicy.java | 107 +++
 .../transport/mqtt/MQTTProtocolConverter.java   |  71 +---
 .../activemq/transport/mqtt/MQTTTest.java   |  86 ---
 4 files changed, 215 insertions(+), 63 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 277ce05..4744af8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -33,7 +33,7 @@ import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.policy.DispatchPolicy;
 import 
org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
-import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy;
+import 
org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
 import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
 import org.apache.activemq.broker.util.InsertionCountList;
@@ -91,7 +91,7 @@ public class Topic extends BaseDestination implements Task {
 subscriptionRecoveryPolicy = new 
LastImageSubscriptionRecoveryPolicy();
 setAlwaysRetroactive(true);
 } else {
-subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy();
+subscriptionRecoveryPolicy = new 
RetainedMessageSubscriptionRecoveryPolicy(null);
 }
 this.taskRunner = taskFactory.createTaskRunner(this, Topic   + 
destination.getPhysicalName());
 }
@@ -675,8 +675,14 @@ public class Topic extends BaseDestination implements Task 
{
 return subscriptionRecoveryPolicy;
 }
 
-public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy 
subscriptionRecoveryPolicy) {
-this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
+public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy 
recoveryPolicy) {
+if (this.subscriptionRecoveryPolicy != null  
this.subscriptionRecoveryPolicy instanceof 
RetainedMessageSubscriptionRecoveryPolicy) {
+// allow users to combine retained message policy with other 
ActiveMQ policies
+RetainedMessageSubscriptionRecoveryPolicy policy = 
(RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy;
+policy.setWrapped(recoveryPolicy);
+} else {
+this.subscriptionRecoveryPolicy = recoveryPolicy;
+}
 }
 
 // Implementation methods

http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
new file mode 100644
index 000..d350a5f
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a 

[13/13] git commit: Fixed AMQ-5160, removed producer's retain property from retained messages

2014-05-26 Thread dejanb
Fixed AMQ-5160, removed producer's retain property from retained messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c915b19a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c915b19a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c915b19a

Branch: refs/heads/trunk
Commit: c915b19a205d37c2faad05178de716dca64981d5
Parents: 78950ec
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Tue May 13 12:15:14 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../region/policy/RetainedMessageSubscriptionRecoveryPolicy.java| 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/c915b19a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
index fb0313f..f1573fa 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -56,6 +56,7 @@ public class RetainedMessageSubscriptionRecoveryPolicy 
implements SubscriptionRe
 if (message.getContent().getLength()  0) {
 // non zero length message content
 retainedMessage = message.copy();
+retainedMessage.getMessage().removeProperty(RETAIN_PROPERTY);
 retainedMessage.getMessage().setProperty(RETAINED_PROPERTY, 
true);
 } else {
 // clear retained message



[08/13] git commit: Fixed AMQ-5160, force durable subscriptions to always recover retroactive messages

2014-05-26 Thread dejanb
Fixed AMQ-5160, force durable subscriptions to always recover retroactive 
messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8947a09e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8947a09e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8947a09e

Branch: refs/heads/trunk
Commit: 8947a09eaa075cb8f5e86599404198e0a5c91910
Parents: b36adff
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Tue May 13 00:29:03 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../broker/region/DurableTopicSubscription.java | 16 ++--
 1 file changed, 6 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/8947a09e/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 8cb6ecc..6501e58 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -120,9 +120,8 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
 if (active.get() || keepDurableSubsActive) {
 Topic topic = (Topic) destination;
 topic.activate(context, this);
-if (pending.isEmpty(topic)) {
-topic.recoverRetroactiveMessages(context, this);
-}
+// always use the recovery policy
+topic.recoverRetroactiveMessages(context, this);
 this.enqueueCounter += pending.size();
 } else if (destination.getMessageStore() != null) {
 TopicMessageStore store = (TopicMessageStore) 
destination.getMessageStore();
@@ -167,13 +166,10 @@ public class DurableTopicSubscription extends 
PrefetchSubscription implements Us
 pending.setMaxAuditDepth(getMaxAuditDepth());
 pending.setMaxProducersToAudit(getMaxProducersToAudit());
 pending.start();
-// If nothing was in the persistent store, then try to use 
the
-// recovery policy.
-if (pending.isEmpty()) {
-for (Destination destination : 
durableDestinations.values()) {
-Topic topic = (Topic) destination;
-topic.recoverRetroactiveMessages(context, this);
-}
+// always use the recovery policy.
+for (Destination destination : 
durableDestinations.values()) {
+Topic topic = (Topic) destination;
+topic.recoverRetroactiveMessages(context, this);
 }
 }
 }



[04/13] git commit: Fixed AMQ-5160, polished MQTT tests

2014-05-26 Thread dejanb
Fixed AMQ-5160, polished MQTT tests


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a39782b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a39782b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a39782b

Branch: refs/heads/trunk
Commit: 0a39782bf5d95fc0ae6d54a7fa1469b230621358
Parents: 88c6ee9
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Tue May 13 00:30:11 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../activemq/transport/mqtt/MQTTTest.java   | 31 ++--
 1 file changed, 22 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/0a39782b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
--
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index e11f6e9..3c0701e 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -379,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest {
 connection.publish(topic, (RETAINED + topic).getBytes(), 
QoS.AT_LEAST_ONCE, true);
 
 connection.subscribe(new Topic[] { new Topic(topic, 
QoS.AT_LEAST_ONCE) });
-Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+Message msg = connection.receive(5, TimeUnit.SECONDS);
 assertNotNull(No message for  + topic, msg);
 assertEquals(RETAINED + topic, new String(msg.getPayload()));
 msg.ack();
@@ -406,7 +406,7 @@ public class MQTTTest extends AbstractMQTTTest {
 assertNotEquals(Subscribe failed  + wildcard, (byte)0x80, 
qos[0]);
 
 // test retained messages
-Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
+Message msg = connection.receive(5, TimeUnit.SECONDS);
 do {
 assertNotNull(RETAINED null  + wildcard, msg);
 assertTrue(RETAINED prefix  + wildcard, new 
String(msg.getPayload()).startsWith(RETAINED));
@@ -459,7 +459,7 @@ public class MQTTTest extends AbstractMQTTTest {
 final BlockingConnection connection = mqtt.blockingConnection();
 connection.connect();
 connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, 
true);
-connection.subscribe(new Topic[] { new Topic(topic, 
QoS.valueOf(topic)) });
+connection.subscribe(new Topic[]{new Topic(topic, 
QoS.valueOf(topic))});
 
 final Message msg = connection.receive(5000, 
TimeUnit.MILLISECONDS);
 assertNotNull(msg);
@@ -472,7 +472,7 @@ public class MQTTTest extends AbstractMQTTTest {
 assertEquals(i, actualQoS[0]);
 msg.ack();
 
-connection.unsubscribe(new String[] { topic });
+connection.unsubscribe(new String[]{topic});
 connection.disconnect();
 }
 
@@ -1341,10 +1341,9 @@ public class MQTTTest extends AbstractMQTTTest {
 BlockingConnection connectionSub = mqttSub.blockingConnection();
 connectionSub.connect();
 connectionSub.subscribe(topics);
-connectionSub.unsubscribe(new String[] { TopicA });
 connectionSub.disconnect();
 
-for (int i = 0; i  10; i++) {
+for (int i = 0; i  5; i++) {
 String payload = Message  + i;
 connectionPub.publish(topics[0].name().toString(), 
payload.getBytes(), QoS.EXACTLY_ONCE, false);
 }
@@ -1353,14 +1352,28 @@ public class MQTTTest extends AbstractMQTTTest {
 connectionSub.connect();
 
 int received = 0;
-for (int i = 0; i  10; ++i) {
+for (int i = 0; i  5; ++i) {
 Message message = connectionSub.receive(5, TimeUnit.SECONDS);
-assertNotNull(message);
+assertNotNull(Missing message  + i, message);
 LOG.info(Message is  + new String(message.getPayload()));
 received++;
 message.ack();
 }
-assertEquals(10, received);
+assertEquals(5, received);
+
+// unsubscribe from topic
+connectionSub.unsubscribe(new String[]{TopicA});
+
+// send more messages
+for (int i = 0; i  5; i++) {
+String payload = Message  + i;
+connectionPub.publish(topics[0].name().toString(), 
payload.getBytes(), QoS.EXACTLY_ONCE, false);
+}
+
+// these should not be received
+connectionSub = mqttSub.blockingConnection();
+connectionSub.connect();
+  

[10/13] git commit: Fixed AMQ-5160, remove durable subscription in onUnsubscribe()

2014-05-26 Thread dejanb
Fixed AMQ-5160, remove durable subscription in onUnsubscribe()


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/88c6ee97
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/88c6ee97
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/88c6ee97

Branch: refs/heads/trunk
Commit: 88c6ee97e0fc8652f2dba16786cb8cbd7b80a1b7
Parents: 8947a09
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Tue May 13 00:29:40 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../transport/mqtt/MQTTProtocolConverter.java   | 50 
 1 file changed, 31 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/88c6ee97/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
--
diff --git 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index cbb6415..71a6fcf 100644
--- 
a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ 
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -361,12 +361,13 @@ public class MQTTProtocolConverter {
 ActiveMQDestination destination = new 
ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString()));
 
 if( mqttSubscriptionByTopic.containsKey(topicName)) {
-if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) {
+final MQTTSubscription mqttSubscription = 
mqttSubscriptionByTopic.get(topicName);
+if (topicQoS != mqttSubscription.qos()) {
 // remove old subscription as the QoS has changed
 onUnSubscribe(topicName);
 } else {
 // duplicate SUBSCRIBE packet, find all matching topics and 
resend retained messages
-resendRetainedMessages(topicName, destination);
+resendRetainedMessages(topicName, destination, 
mqttSubscription);
 
 return (byte) topicQoS.ordinal();
 }
@@ -408,7 +409,8 @@ public class MQTTProtocolConverter {
 return qos[0];
 }
 
-private void resendRetainedMessages(UTF8Buffer topicName, 
ActiveMQDestination destination) throws MQTTProtocolException {
+private void resendRetainedMessages(UTF8Buffer topicName, 
ActiveMQDestination destination,
+MQTTSubscription mqttSubscription) 
throws MQTTProtocolException {
 // get TopicRegion
 RegionBroker regionBroker;
 try {
@@ -418,25 +420,26 @@ public class MQTTProtocolConverter {
 }
 final TopicRegion topicRegion = (TopicRegion) 
regionBroker.getTopicRegion();
 
+final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
+final ConsumerId consumerId = consumerInfo.getConsumerId();
+
+// use actual client id used to create connection to lookup connection 
context
+final String connectionInfoClientId = connectionInfo.getClientId();
+final ConnectionContext connectionContext = 
regionBroker.getConnectionContext(connectionInfoClientId);
+
 // get all matching Topics
 final Setorg.apache.activemq.broker.region.Destination 
matchingDestinations = topicRegion.getDestinations(destination);
 for (org.apache.activemq.broker.region.Destination dest : 
matchingDestinations) {
-// find matching MQTT subscription for this client
-final String mqttTopicName = convertActiveMQToMQTT(dest.getName());
-final MQTTSubscription mqttSubscription = 
mqttSubscriptionByTopic.get(new UTF8Buffer(mqttTopicName));
-if (mqttSubscription != null) {
-// recover retroactive messages for matching subscription
-final ConsumerInfo consumerInfo = 
mqttSubscription.getConsumerInfo();
-final ConsumerId consumerId = consumerInfo.getConsumerId();
-final Subscription subscription = 
topicRegion.getSubscriptions().get(consumerId);
-
-// use actual client id used to create connection to lookup 
connection context
-final ConnectionContext connectionContext = 
regionBroker.getConnectionContext(connectionInfo.getClientId());
-try {
-
((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext,
 subscription);
-} catch (Exception e) {
-throw new MQTTProtocolException(Error recovering retained 
messages for  +
-mqttTopicName + :  + e.getMessage(), 

[02/13] git commit: Fixed AMQ-5160, Added RegionBroker.getConnectionContext(), made Topic.recoverRetroactiveMessages() public to force message recovery from MQTTProtocolConverter for duplicate subscri

2014-05-26 Thread dejanb
Fixed AMQ-5160, Added RegionBroker.getConnectionContext(), made 
Topic.recoverRetroactiveMessages() public to force message recovery from 
MQTTProtocolConverter for duplicate subscriptions, added new tests for retained 
messages and JMX interoperability


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a581d010
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a581d010
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a581d010

Branch: refs/heads/trunk
Commit: a581d010aa96a5e98e107435a242fc55679042a9
Parents: bcb60a4
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Mon May 12 12:37:54 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:18 2014 +0200

--
 .../activemq/broker/region/RegionBroker.java|  4 +
 .../apache/activemq/broker/region/Topic.java|  2 +-
 ...tainedMessageSubscriptionRecoveryPolicy.java |  4 +-
 .../transport/mqtt/MQTTProtocolConverter.java   | 56 +--
 .../activemq/transport/mqtt/MQTTTest.java   | 98 ++--
 5 files changed, 145 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/a581d010/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 46c6de1..da6e1fa 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -215,6 +215,10 @@ public class RegionBroker extends EmptyBroker {
 return brokerService != null ? brokerService.getDestinationPolicy() : 
null;
 }
 
+public ConnectionContext getConnectionContext(String clientId) {
+return clientIdSet.get(clientId);
+}
+
 @Override
 public void addConnection(ConnectionContext context, ConnectionInfo info) 
throws Exception {
 String clientId = info.getClientId();

http://git-wip-us.apache.org/repos/asf/activemq/blob/a581d010/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 4744af8..0186b42 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -305,7 +305,7 @@ public class Topic extends BaseDestination implements Task {
 sub.remove(context, this, dispatched);
 }
 
-protected void recoverRetroactiveMessages(ConnectionContext context, 
Subscription subscription) throws Exception {
+public void recoverRetroactiveMessages(ConnectionContext context, 
Subscription subscription) throws Exception {
 if (subscription.getConsumerInfo().isRetroactive()) {
 subscriptionRecoveryPolicy.recover(context, this, subscription);
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a581d010/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
index d350a5f..ba2d1a1 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java
@@ -37,8 +37,8 @@ import org.apache.activemq.filter.DestinationFilter;
  */
 public class RetainedMessageSubscriptionRecoveryPolicy implements 
SubscriptionRecoveryPolicy {
 
-public static final String RETAIN_PROPERTY = ActiveMQRetain;
-public static final String RETAINED_PROPERTY = ActiveMQRetained;
+public static final String RETAIN_PROPERTY = ActiveMQ.Retain;
+public static final String RETAINED_PROPERTY = ActiveMQ.Retained;
 private volatile MessageReference retainedMessage;
 private SubscriptionRecoveryPolicy wrapped;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/a581d010/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java

[09/13] git commit: Fixed AMQ-5160, fixed test testNoMessageReceivedAfterUnsubscribeMQTT

2014-05-26 Thread dejanb
Fixed AMQ-5160, fixed test testNoMessageReceivedAfterUnsubscribeMQTT


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/70f7c580
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/70f7c580
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/70f7c580

Branch: refs/heads/trunk
Commit: 70f7c5805c0a39034fd1f4f9b76b6e7b293b33ed
Parents: 0a39782
Author: Dhiraj Bokde dhira...@yahoo.com
Authored: Tue May 13 00:45:05 2014 -0700
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 11:07:19 2014 +0200

--
 .../src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java | 2 --
 1 file changed, 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/70f7c580/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
--
diff --git 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 3c0701e..9c8c9b5 100644
--- 
a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ 
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -1371,8 +1371,6 @@ public class MQTTTest extends AbstractMQTTTest {
 }
 
 // these should not be received
-connectionSub = mqttSub.blockingConnection();
-connectionSub.connect();
 assertNull(connectionSub.receive(5, TimeUnit.SECONDS));
 
 connectionSub.disconnect();



Jenkins build became unstable: ActiveMQ-Java8 » ActiveMQ :: AMQP #3

2014-05-26 Thread Apache Jenkins Server
See 
https://builds.apache.org/job/ActiveMQ-Java8/org.apache.activemq$activemq-amqp/3/changes



Jenkins build is still unstable: ActiveMQ-Java8 » ActiveMQ :: MQTT Protocol #3

2014-05-26 Thread Apache Jenkins Server
See 
https://builds.apache.org/job/ActiveMQ-Java8/org.apache.activemq$activemq-mqtt/3/



Jenkins build is unstable: ActiveMQ-Java8 » ActiveMQ :: Unit Tests #3

2014-05-26 Thread Apache Jenkins Server
See 
https://builds.apache.org/job/ActiveMQ-Java8/org.apache.activemq$activemq-unit-tests/3/changes



Jenkins build is unstable: ActiveMQ-Java8 #3

2014-05-26 Thread Apache Jenkins Server
See https://builds.apache.org/job/ActiveMQ-Java8/3/changes



git commit: https://issues.apache.org/jira/browse/AMQ-4555 - fix regression that groupClass cannot be set on cachedLDAPAuthorizationMap/

2014-05-26 Thread dejanb
Repository: activemq
Updated Branches:
  refs/heads/trunk 6c859676b - 9d656731a


https://issues.apache.org/jira/browse/AMQ-4555 - fix regression that groupClass 
cannot be set on cachedLDAPAuthorizationMap/


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9d656731
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9d656731
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9d656731

Branch: refs/heads/trunk
Commit: 9d656731ab00280368a212b17c2d538b66d0e91b
Parents: 6c85967
Author: Dejan Bosanac de...@nighttale.net
Authored: Mon May 26 16:42:38 2014 +0200
Committer: Dejan Bosanac de...@nighttale.net
Committed: Mon May 26 16:42:38 2014 +0200

--
 .../security/SimpleCachedLDAPAuthorizationMap.java  | 12 
 .../org/apache/activemq/security/activemq-apacheds.xml  |  2 +-
 2 files changed, 13 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/9d656731/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
 
b/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
index 2de4eb5..e01d5c0 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java
@@ -90,6 +90,8 @@ public class SimpleCachedLDAPAuthorizationMap implements 
AuthorizationMap {
 private int refreshInterval = -1;
 private boolean refreshDisabled = false;
 
+protected String groupClass = DefaultAuthorizationMap.DEFAULT_GROUP_CLASS;
+
 // Internal State
 private long lastUpdated;
 
@@ -255,6 +257,7 @@ public class SimpleCachedLDAPAuthorizationMap implements 
AuthorizationMap {
 
 // Create and swap in the new instance with updated LDAP data.
 newMap.setAuthorizationEntries(new 
ArrayListDestinationMapEntry(entries.values()));
+newMap.setGroupClass(groupClass);
 this.map.set(newMap);
 
 updated();
@@ -1107,6 +1110,15 @@ public class SimpleCachedLDAPAuthorizationMap implements 
AuthorizationMap {
 this.refreshInterval = refreshInterval;
 }
 
+public String getGroupClass() {
+return groupClass;
+}
+
+public void setGroupClass(String groupClass) {
+this.groupClass = groupClass;
+map.get().setGroupClass(groupClass);
+}
+
 protected static enum DestinationType {
 QUEUE, TOPIC, TEMP;
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/9d656731/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
--
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
index e498ae0..67768c1 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml
@@ -39,7 +39,7 @@
 
   authorizationPlugin
   map
-cachedLDAPAuthorizationMap legacyGroupMapping=false 
connectionURL=ldap://localhost:${ldapPort}/
+cachedLDAPAuthorizationMap legacyGroupMapping=false 
connectionURL=ldap://localhost:${ldapPort}; 
groupClass=org.apache.activemq.jaas.GroupPrincipal/
   /map
   /authorizationPlugin
   /plugins