buildbot failure in ASF Buildbot on activemq-site-production
The Buildbot has detected a new failure on builder activemq-site-production while building ASF Buildbot. Full details are available at: http://ci.apache.org/builders/activemq-site-production/builds/826 Buildbot URL: http://ci.apache.org/ Buildslave for this Build: bb-cms-slave Build Reason: The Nightly scheduler named 'activemq-site-production' triggered this build Build Source Stamp: [branch activemq/activemq-website] HEAD Blamelist: BUILD FAILED: failed compile sincerely, -The Buildbot
Build failed in Jenkins: ActiveMQ-Java7 » ActiveMQ :: Unit Tests #337
See https://builds.apache.org/job/ActiveMQ-Java7/org.apache.activemq$activemq-unit-tests/337/changes Changes: [claus.ibsen] Added close destroy-method to Postgres data source. Thanks to Jakub Korab for reporting in https://github.com/apache/activemq/pull/19 -- [...truncated 525 lines...] Running org.apache.activemq.config.BrokerPropertiesTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.661 sec - in org.apache.activemq.config.BrokerPropertiesTest Running org.apache.activemq.config.BrokerXmlConfigTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 2.995 sec - in org.apache.activemq.config.BrokerXmlConfigTest Running org.apache.activemq.config.JDBCConfigTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 7.017 sec - in org.apache.activemq.config.JDBCConfigTest Running org.apache.activemq.console.command.AMQ3410Test Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 16.656 sec - in org.apache.activemq.console.command.AMQ3410Test Running org.apache.activemq.console.command.AMQ3411Test Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 10.478 sec - in org.apache.activemq.console.command.AMQ3411Test Running org.apache.activemq.console.command.PurgeCommandTest Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 12.175 sec - in org.apache.activemq.console.command.PurgeCommandTest Running org.apache.activemq.conversions.AmqpAndMqttTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.711 sec - in org.apache.activemq.conversions.AmqpAndMqttTest Running org.apache.activemq.filter.DestinationFilterTest Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.069 sec - in org.apache.activemq.filter.DestinationFilterTest Running org.apache.activemq.filter.DestinationMapMemoryTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.1 sec - in org.apache.activemq.filter.DestinationMapMemoryTest Running org.apache.activemq.filter.DestinationMapTempDestinationTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.311 sec - in org.apache.activemq.filter.DestinationMapTempDestinationTest Running org.apache.activemq.filter.DestinationMapTest Tests run: 16, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.084 sec - in org.apache.activemq.filter.DestinationMapTest Running org.apache.activemq.filter.DestinationPathTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.145 sec - in org.apache.activemq.filter.DestinationPathTest Running org.apache.activemq.filter.DummyPolicyTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.458 sec - in org.apache.activemq.filter.DummyPolicyTest Running org.apache.activemq.jmx.JmxCreateNCTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.246 sec - in org.apache.activemq.jmx.JmxCreateNCTest Running org.apache.activemq.jmx.OpenTypeSupportTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.262 sec - in org.apache.activemq.jmx.OpenTypeSupportTest Running org.apache.activemq.jndi.ActiveMQInitialContextFactoryTest Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.212 sec - in org.apache.activemq.jndi.ActiveMQInitialContextFactoryTest Running org.apache.activemq.jndi.ActiveMQWASInitialContextFactoryTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.197 sec - in org.apache.activemq.jndi.ActiveMQWASInitialContextFactoryTest Running org.apache.activemq.jndi.CustomConnectionFactoryNameTest Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.213 sec - in org.apache.activemq.jndi.CustomConnectionFactoryNameTest Running org.apache.activemq.jndi.DestinationNameWithSlashTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.198 sec - in org.apache.activemq.jndi.DestinationNameWithSlashTest Running org.apache.activemq.jndi.InitialContextTest Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.212 sec - in org.apache.activemq.jndi.InitialContextTest Running org.apache.activemq.jndi.ObjectFactoryTest Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.197 sec - in org.apache.activemq.jndi.ObjectFactoryTest Running org.apache.activemq.jndi.XAConnectionFactoryTest Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.208 sec - in org.apache.activemq.jndi.XAConnectionFactoryTest Running org.apache.activemq.joramtests.JoramJmsTest Tests run: 196, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 12.876 sec - in org.apache.activemq.joramtests.JoramJmsTest Running org.apache.activemq.leveldb.LevelDBStoreBrokerTest Tests run: 129, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 59.882 sec - in org.apache.activemq.leveldb.LevelDBStoreBrokerTest Running org.apache.activemq.management.BoundaryStatisticTest Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.138 sec - in
Jenkins build became unstable: ActiveMQ-Java7 » ActiveMQ :: AMQP #337
See https://builds.apache.org/job/ActiveMQ-Java7/org.apache.activemq$activemq-amqp/337/changes
Jenkins build became unstable: ActiveMQ-Java7 » ActiveMQ :: LevelDB Store #337
See https://builds.apache.org/job/ActiveMQ-Java7/org.apache.activemq$activemq-leveldb-store/337/
Build failed in Jenkins: ActiveMQ-Java7 #337
See https://builds.apache.org/job/ActiveMQ-Java7/337/changes Changes: [jsherman] Add Jetty host property to resolve URL message [claus.ibsen] Added close destroy-method to Postgres data source. Thanks to Jakub Korab for reporting in https://github.com/apache/activemq/pull/19 [tabish121] Turn down default logging to DEBUG for tests. [tabish121] Pull out JMS client common test bits into a test support class. [tabish121] https://issues.apache.org/jira/browse/AMQ-5183 [tabish121] Add an option to allow test cases to easily change the transformer [claus.ibsen] Upgraded to Camel 2.13.1 [dejan] https://issues.apache.org/jira/browse/AMQ-5186 - remove amqp producers [tabish121] Add option to turn on an OpenWire TCP endpoint. [tabish121] Use a fixed buffer to handle incoming deliveries from proton via the [tabish121] https://issues.apache.org/jira/browse/AMQ-5195 [tabish121] https://issues.apache.org/jira/browse/AMQ-5195 -- [...truncated 3563 lines...] [INFO] [INFO] [INFO] [INFO] Skipping ActiveMQ :: Web Console [INFO] This project has been banned from the build due to previous failures. [INFO] [INFO] [INFO] [INFO] Skipping ActiveMQ :: Karaf Integration Tests [INFO] This project has been banned from the build due to previous failures. [INFO] [INFO] [INFO] [INFO] Skipping ActiveMQ :: Integration Test :: Spring 3.1 [INFO] This project has been banned from the build due to previous failures. [INFO] [INFO] [INFO] [INFO] Skipping ActiveMQ :: Assembly [INFO] This project has been banned from the build due to previous failures. [INFO] [INFO] [INFO] Reactor Summary: [INFO] [INFO] ActiveMQ .. SUCCESS [19.351s] [INFO] ActiveMQ :: Openwire Generator SUCCESS [12.782s] [INFO] ActiveMQ :: Client SUCCESS [2:02.637s] [INFO] ActiveMQ :: Openwire Legacy Support ... SUCCESS [15.107s] [INFO] ActiveMQ :: JAAS .. SUCCESS [56.536s] [INFO] ActiveMQ :: Broker SUCCESS [1:35.977s] [INFO] ActiveMQ :: KahaDB Store .. SUCCESS [6:49.898s] [INFO] ActiveMQ :: STOMP Protocol SUCCESS [24:14.999s] [INFO] ActiveMQ :: MQTT Protocol . SUCCESS [9:35.916s] [INFO] ActiveMQ :: JDBC Store SUCCESS [12.853s] [INFO] ActiveMQ :: LevelDB Store . SUCCESS [20:09.339s] [INFO] ActiveMQ :: Generic JMS Pool .. SUCCESS [1:32.926s] [INFO] ActiveMQ :: Pool .. SUCCESS [12.586s] [INFO] ActiveMQ :: RA SUCCESS [2:45.345s] [INFO] ActiveMQ :: Spring SUCCESS [1:23.399s] [INFO] ActiveMQ :: AMQP .. SUCCESS [18:56.547s] [INFO] ActiveMQ :: Console ... SUCCESS [14.531s] [INFO] ActiveMQ :: Partition Management .. SUCCESS [26.208s] [INFO] ActiveMQ :: Unit Tests FAILURE [1:43:04.471s] [INFO] ActiveMQ :: Camel . SKIPPED [INFO] ActiveMQ :: HTTP Protocol Support . SKIPPED [INFO] ActiveMQ :: All JAR bundle SKIPPED [INFO] ActiveMQ :: File Server ... SKIPPED [INFO] ActiveMQ :: Log4j Appender SUCCESS [11.173s] [INFO] ActiveMQ :: Apache Karaf .. SKIPPED [INFO] ActiveMQ :: RAR ... SKIPPED [INFO] ActiveMQ :: Run Jar ... SUCCESS [6.544s] [INFO] ActiveMQ :: Shiro . SKIPPED [INFO] ActiveMQ :: Runtime Configuration . SUCCESS [4:30.341s] [INFO] ActiveMQ :: Tooling ... SUCCESS [5.612s] [INFO] ActiveMQ :: Memory Usage Test Plugin .. SUCCESS
buildbot success in ASF Buildbot on activemq-site-production
The Buildbot has detected a restored build on builder activemq-site-production while building ASF Buildbot. Full details are available at: http://ci.apache.org/builders/activemq-site-production/builds/827 Buildbot URL: http://ci.apache.org/ Buildslave for this Build: bb-cms-slave Build Reason: The Nightly scheduler named 'activemq-site-production' triggered this build Build Source Stamp: [branch activemq/activemq-website] HEAD Blamelist: Build succeeded! sincerely, -The Buildbot
buildbot failure in ASF Buildbot on activemq-site-production
The Buildbot has detected a new failure on builder activemq-site-production while building ASF Buildbot. Full details are available at: http://ci.apache.org/builders/activemq-site-production/builds/828 Buildbot URL: http://ci.apache.org/ Buildslave for this Build: bb-cms-slave Build Reason: The Nightly scheduler named 'activemq-site-production' triggered this build Build Source Stamp: [branch activemq/activemq-website] HEAD Blamelist: BUILD FAILED: failed compile sincerely, -The Buildbot
[12/13] git commit: Fixed AMQ-5160, fixed durable subscription retroactive recovery
Fixed AMQ-5160, fixed durable subscription retroactive recovery Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6c859676 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6c859676 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6c859676 Branch: refs/heads/trunk Commit: 6c859676b3995334e96c16c47653ec72fa70f729 Parents: 42ad103 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Fri May 16 14:21:19 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../broker/region/DurableTopicSubscription.java | 18 --- .../broker/region/PrefetchSubscription.java | 3 +- .../transport/mqtt/MQTTProtocolConverter.java | 21 .../activemq/transport/mqtt/MQTTTest.java | 34 ++-- 4 files changed, 55 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index e61a608..4c19c62 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import javax.jms.InvalidSelectorException; import javax.jms.JMSException; @@ -120,9 +119,6 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us if (active.get() || keepDurableSubsActive) { Topic topic = (Topic) destination; topic.activate(context, this); -if (topic.isAlwaysRetroactive() || info.isRetroactive()) { -topic.recoverRetroactiveMessages(context, this); -} this.enqueueCounter += pending.size(); } else if (destination.getMessageStore() != null) { TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); @@ -172,12 +168,12 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us pending.setMaxAuditDepth(getMaxAuditDepth()); pending.setMaxProducersToAudit(getMaxProducersToAudit()); pending.start(); -// use recovery policy for retroactive topics and consumers -for (Destination destination : durableDestinations.values()) { -Topic topic = (Topic) destination; -if (topic.isAlwaysRetroactive() || info.isRetroactive()) { -topic.recoverRetroactiveMessages(context, this); -} +} +// use recovery policy every time sub is activated for retroactive topics and consumers +for (Destination destination : durableDestinations.values()) { +Topic topic = (Topic) destination; +if (topic.isAlwaysRetroactive() || info.isRetroactive()) { +topic.recoverRetroactiveMessages(context, this); } } } @@ -277,7 +273,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us } @Override -protected void dispatchPending() throws IOException { +public void dispatchPending() throws IOException { if (isActive()) { super.dispatchPending(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/6c859676/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index ff4c0aa..5ba3b53 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -633,7 +633,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription { dispatched.removeAll(references); } -protected void dispatchPending() throws IOException { +// made public so it can be used in
[05/13] git commit: Fixed AMQ-5160, fixed race condition for retained messages
Fixed AMQ-5160, fixed race condition for retained messages Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/86440903 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/86440903 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/86440903 Branch: refs/heads/trunk Commit: 8644090377eeef09a5afb2e46594c4fa4c311aae Parents: c915b19 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Tue May 13 12:16:44 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../transport/mqtt/MQTTProtocolConverter.java | 16 +++- 1 file changed, 11 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/86440903/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java -- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 71a6fcf..88e684e 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -386,6 +386,10 @@ public class MQTTProtocolConverter { } MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo); +// optimistic add to local maps first to be able to handle commands in onActiveMQCommand +subscriptionsByConsumerId.put(id, mqttSubscription); +mqttSubscriptionByTopic.put(topicName, mqttSubscription); + final byte[] qos = {-1}; sendToActiveMQ(consumerInfo, new ResponseHandler() { @Override @@ -401,9 +405,10 @@ public class MQTTProtocolConverter { } }); -if (qos[0] != SUBSCRIBE_ERROR) { -subscriptionsByConsumerId.put(id, mqttSubscription); -mqttSubscriptionByTopic.put(topicName, mqttSubscription); +if (qos[0] == SUBSCRIBE_ERROR) { +// remove from local maps if subscribe failed +subscriptionsByConsumerId.remove(id); +mqttSubscriptionByTopic.remove(topicName); } return qos[0]; @@ -431,7 +436,7 @@ public class MQTTProtocolConverter { final Setorg.apache.activemq.broker.region.Destination matchingDestinations = topicRegion.getDestinations(destination); for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) { -// recover retroactive messages for matching subscriptions +// recover retroactive messages for matching subscription for (Subscription subscription : dest.getConsumers()) { if (subscription.getConsumerInfo().getConsumerId().equals(consumerId)) { try { @@ -440,6 +445,7 @@ public class MQTTProtocolConverter { throw new MQTTProtocolException(Error recovering retained messages for + dest.getName() + : + e.getMessage(), false, e); } +break; } } } @@ -483,7 +489,7 @@ public class MQTTProtocolConverter { } /** - * Dispatch a ActiveMQ command + * Dispatch an ActiveMQ command */ public void onActiveMQCommand(Command command) throws Exception { if (command.isResponse()) {
[07/13] git commit: Fixed AMQ-5160, removed redundant MQTTRetainedMessages
Fixed AMQ-5160, removed redundant MQTTRetainedMessages Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b36adffe Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b36adffe Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b36adffe Branch: refs/heads/trunk Commit: b36adffe71f782d676bcddf41997fec4d1f831d2 Parents: 5576dc5 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Mon May 12 20:25:11 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../transport/mqtt/MQTTRetainedMessages.java| 95 1 file changed, 95 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/b36adffe/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java -- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java deleted file mode 100644 index 250366d..000 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTRetainedMessages.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.transport.mqtt; - -import org.apache.activemq.Service; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.filter.DestinationMapNode; -import org.apache.activemq.util.ServiceStopper; -import org.apache.activemq.util.ServiceSupport; -import org.fusesource.hawtbuf.Buffer; -import org.fusesource.hawtbuf.UTF8Buffer; -import org.fusesource.mqtt.codec.PUBLISH; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashSet; -import java.util.Set; - -public class MQTTRetainedMessages extends ServiceSupport { -private static final Logger LOG = LoggerFactory.getLogger(MQTTRetainedMessages.class); -private static final Object LOCK = new Object(); - -DestinationMapNode retainedMessages = new DestinationMapNode(null); - -private MQTTRetainedMessages(){ -} - -@Override -protected void doStop(ServiceStopper stopper) throws Exception { -synchronized (this) { -retainedMessages = new DestinationMapNode(null); -} -} - -@Override -protected void doStart() throws Exception { -} - - public void addMessage(ActiveMQTopic dest, PUBLISH publish){ - synchronized (this) { - retainedMessages.set(dest.getDestinationPaths(), 0, publish); - } - } - - public SetPUBLISH getMessages(ActiveMQTopic topic){ - Set answer = new HashSet(); - synchronized (this) { - retainedMessages.appendMatchingValues(answer, topic.getDestinationPaths(), 0); - } - return (SetPUBLISH)answer; - } - -public static MQTTRetainedMessages getMQTTRetainedMessages(BrokerService broker){ -MQTTRetainedMessages result = null; -if (broker != null){ -synchronized (LOCK){ - Service[] services = broker.getServices(); - if (services != null){ - for (Service service:services){ - if (service instanceof MQTTRetainedMessages){ - return (MQTTRetainedMessages) service; - } - } - } - result = new MQTTRetainedMessages(); -broker.addService(result); -if (broker != null broker.isStarted()){ -try { -result.start(); -} catch (Exception e) { -LOG.warn(Couldn't start MQTTRetainedMessages); -} -} -} -} - - -return result; -} -}
[11/13] git commit: Fixed AMQ-5160, restored previous DurableSubscription behaviour of only recovering messages when cursor is empty, retained messages are always recovered
Fixed AMQ-5160, restored previous DurableSubscription behaviour of only recovering messages when cursor is empty, retained messages are always recovered Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/42ad1039 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/42ad1039 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/42ad1039 Branch: refs/heads/trunk Commit: 42ad1039cb51b47a16f46f3d8f0fe8bf36ffdd1d Parents: 8644090 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Tue May 13 13:15:04 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../activemq/broker/region/DurableTopicSubscription.java | 5 + .../policy/RetainedMessageSubscriptionRecoveryPolicy.java | 10 +- 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index c82e6ef..e61a608 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -137,6 +137,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us dispatchPending(); } +// used by RetaineMessageSubscriptionRecoveryPolicy +public boolean isEmpty(Topic topic) { +return pending.isEmpty(topic); +} + public void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info, RegionBroker regionBroker) throws Exception { if (!active.get()) { this.context = context; http://git-wip-us.apache.org/repos/asf/activemq/blob/42ad1039/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java index f1573fa..ea07c8b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.broker.region.SubscriptionRecovery; import org.apache.activemq.broker.region.Topic; @@ -74,7 +75,14 @@ public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRe sub.addRecoveredMessage(context, retainedMessage); } if (wrapped != null) { -wrapped.recover(context, topic, sub); +// retain default ActiveMQ behaviour of recovering messages only for empty durable subscriptions +boolean recover = true; +if (sub instanceof DurableTopicSubscription !((DurableTopicSubscription)sub).isEmpty(topic)) { +recover = false; +} +if (recover) { +wrapped.recover(context, topic, sub); +} } }
[01/13] git commit: Fixed AMQ-5160, fixed browse() to include messages from wrapped policy
Repository: activemq Updated Branches: refs/heads/trunk ba519d8bd - 6c859676b Fixed AMQ-5160, fixed browse() to include messages from wrapped policy Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5576dc5d Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5576dc5d Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5576dc5d Branch: refs/heads/trunk Commit: 5576dc5d74e8ccb284dda19d60872a1efb03fcd0 Parents: a581d01 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Mon May 12 19:51:21 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:18 2014 +0200 -- .../RetainedMessageSubscriptionRecoveryPolicy.java| 14 -- 1 file changed, 12 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/5576dc5d/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java index ba2d1a1..fb0313f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java @@ -17,6 +17,7 @@ package org.apache.activemq.broker.region.policy; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.activemq.broker.Broker; @@ -83,14 +84,23 @@ public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRe } public Message[] browse(ActiveMQDestination destination) throws Exception { -ListMessage result = new ArrayListMessage(); +final ListMessage result = new ArrayListMessage(); if (retainedMessage != null) { DestinationFilter filter = DestinationFilter.parseFilter(destination); if (filter.matches(retainedMessage.getMessage().getDestination())) { result.add(retainedMessage.getMessage()); } } -return result.toArray(new Message[result.size()]); +Message[] messages = result.toArray(new Message[result.size()]); +if (wrapped != null) { +final Message[] wrappedMessages = wrapped.browse(destination); +if (wrappedMessages != null wrappedMessages.length 0) { +final int origLen = messages.length; +messages = Arrays.copyOf(messages, origLen + wrappedMessages.length); +System.arraycopy(wrappedMessages, 0, messages, origLen, wrappedMessages.length); +} +} +return messages; } public SubscriptionRecoveryPolicy copy() {
[03/13] git commit: Partial fix for AMQ-5160, attempts to resolve retained messages using subscription recovery policy, but fails to resend retained messages for duplicate subscriptions
Partial fix for AMQ-5160, attempts to resolve retained messages using subscription recovery policy, but fails to resend retained messages for duplicate subscriptions Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bcb60a48 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bcb60a48 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bcb60a48 Branch: refs/heads/trunk Commit: bcb60a482cdc1cd2d2de9b4dba6a38ef831b2fa4 Parents: ba519d8 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Wed May 7 19:05:36 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:18 2014 +0200 -- .../apache/activemq/broker/region/Topic.java| 14 ++- ...tainedMessageSubscriptionRecoveryPolicy.java | 107 +++ .../transport/mqtt/MQTTProtocolConverter.java | 71 +--- .../activemq/transport/mqtt/MQTTTest.java | 86 --- 4 files changed, 215 insertions(+), 63 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 277ce05..4744af8 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -33,7 +33,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; -import org.apache.activemq.broker.region.policy.NoSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.RetainedMessageSubscriptionRecoveryPolicy; import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy; import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy; import org.apache.activemq.broker.util.InsertionCountList; @@ -91,7 +91,7 @@ public class Topic extends BaseDestination implements Task { subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy(); setAlwaysRetroactive(true); } else { -subscriptionRecoveryPolicy = new NoSubscriptionRecoveryPolicy(); +subscriptionRecoveryPolicy = new RetainedMessageSubscriptionRecoveryPolicy(null); } this.taskRunner = taskFactory.createTaskRunner(this, Topic + destination.getPhysicalName()); } @@ -675,8 +675,14 @@ public class Topic extends BaseDestination implements Task { return subscriptionRecoveryPolicy; } -public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) { -this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy; +public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy recoveryPolicy) { +if (this.subscriptionRecoveryPolicy != null this.subscriptionRecoveryPolicy instanceof RetainedMessageSubscriptionRecoveryPolicy) { +// allow users to combine retained message policy with other ActiveMQ policies +RetainedMessageSubscriptionRecoveryPolicy policy = (RetainedMessageSubscriptionRecoveryPolicy) this.subscriptionRecoveryPolicy; +policy.setWrapped(recoveryPolicy); +} else { +this.subscriptionRecoveryPolicy = recoveryPolicy; +} } // Implementation methods http://git-wip-us.apache.org/repos/asf/activemq/blob/bcb60a48/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java new file mode 100644 index 000..d350a5f --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a
[13/13] git commit: Fixed AMQ-5160, removed producer's retain property from retained messages
Fixed AMQ-5160, removed producer's retain property from retained messages Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c915b19a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c915b19a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c915b19a Branch: refs/heads/trunk Commit: c915b19a205d37c2faad05178de716dca64981d5 Parents: 78950ec Author: Dhiraj Bokde dhira...@yahoo.com Authored: Tue May 13 12:15:14 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../region/policy/RetainedMessageSubscriptionRecoveryPolicy.java| 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/c915b19a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java index fb0313f..f1573fa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java @@ -56,6 +56,7 @@ public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRe if (message.getContent().getLength() 0) { // non zero length message content retainedMessage = message.copy(); +retainedMessage.getMessage().removeProperty(RETAIN_PROPERTY); retainedMessage.getMessage().setProperty(RETAINED_PROPERTY, true); } else { // clear retained message
[08/13] git commit: Fixed AMQ-5160, force durable subscriptions to always recover retroactive messages
Fixed AMQ-5160, force durable subscriptions to always recover retroactive messages Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8947a09e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8947a09e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8947a09e Branch: refs/heads/trunk Commit: 8947a09eaa075cb8f5e86599404198e0a5c91910 Parents: b36adff Author: Dhiraj Bokde dhira...@yahoo.com Authored: Tue May 13 00:29:03 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../broker/region/DurableTopicSubscription.java | 16 ++-- 1 file changed, 6 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/8947a09e/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 8cb6ecc..6501e58 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -120,9 +120,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us if (active.get() || keepDurableSubsActive) { Topic topic = (Topic) destination; topic.activate(context, this); -if (pending.isEmpty(topic)) { -topic.recoverRetroactiveMessages(context, this); -} +// always use the recovery policy +topic.recoverRetroactiveMessages(context, this); this.enqueueCounter += pending.size(); } else if (destination.getMessageStore() != null) { TopicMessageStore store = (TopicMessageStore) destination.getMessageStore(); @@ -167,13 +166,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us pending.setMaxAuditDepth(getMaxAuditDepth()); pending.setMaxProducersToAudit(getMaxProducersToAudit()); pending.start(); -// If nothing was in the persistent store, then try to use the -// recovery policy. -if (pending.isEmpty()) { -for (Destination destination : durableDestinations.values()) { -Topic topic = (Topic) destination; -topic.recoverRetroactiveMessages(context, this); -} +// always use the recovery policy. +for (Destination destination : durableDestinations.values()) { +Topic topic = (Topic) destination; +topic.recoverRetroactiveMessages(context, this); } } }
[04/13] git commit: Fixed AMQ-5160, polished MQTT tests
Fixed AMQ-5160, polished MQTT tests Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a39782b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a39782b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a39782b Branch: refs/heads/trunk Commit: 0a39782bf5d95fc0ae6d54a7fa1469b230621358 Parents: 88c6ee9 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Tue May 13 00:30:11 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../activemq/transport/mqtt/MQTTTest.java | 31 ++-- 1 file changed, 22 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/0a39782b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java -- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index e11f6e9..3c0701e 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -379,7 +379,7 @@ public class MQTTTest extends AbstractMQTTTest { connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true); connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) }); -Message msg = connection.receive(1000, TimeUnit.MILLISECONDS); +Message msg = connection.receive(5, TimeUnit.SECONDS); assertNotNull(No message for + topic, msg); assertEquals(RETAINED + topic, new String(msg.getPayload())); msg.ack(); @@ -406,7 +406,7 @@ public class MQTTTest extends AbstractMQTTTest { assertNotEquals(Subscribe failed + wildcard, (byte)0x80, qos[0]); // test retained messages -Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); +Message msg = connection.receive(5, TimeUnit.SECONDS); do { assertNotNull(RETAINED null + wildcard, msg); assertTrue(RETAINED prefix + wildcard, new String(msg.getPayload()).startsWith(RETAINED)); @@ -459,7 +459,7 @@ public class MQTTTest extends AbstractMQTTTest { final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true); -connection.subscribe(new Topic[] { new Topic(topic, QoS.valueOf(topic)) }); +connection.subscribe(new Topic[]{new Topic(topic, QoS.valueOf(topic))}); final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS); assertNotNull(msg); @@ -472,7 +472,7 @@ public class MQTTTest extends AbstractMQTTTest { assertEquals(i, actualQoS[0]); msg.ack(); -connection.unsubscribe(new String[] { topic }); +connection.unsubscribe(new String[]{topic}); connection.disconnect(); } @@ -1341,10 +1341,9 @@ public class MQTTTest extends AbstractMQTTTest { BlockingConnection connectionSub = mqttSub.blockingConnection(); connectionSub.connect(); connectionSub.subscribe(topics); -connectionSub.unsubscribe(new String[] { TopicA }); connectionSub.disconnect(); -for (int i = 0; i 10; i++) { +for (int i = 0; i 5; i++) { String payload = Message + i; connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false); } @@ -1353,14 +1352,28 @@ public class MQTTTest extends AbstractMQTTTest { connectionSub.connect(); int received = 0; -for (int i = 0; i 10; ++i) { +for (int i = 0; i 5; ++i) { Message message = connectionSub.receive(5, TimeUnit.SECONDS); -assertNotNull(message); +assertNotNull(Missing message + i, message); LOG.info(Message is + new String(message.getPayload())); received++; message.ack(); } -assertEquals(10, received); +assertEquals(5, received); + +// unsubscribe from topic +connectionSub.unsubscribe(new String[]{TopicA}); + +// send more messages +for (int i = 0; i 5; i++) { +String payload = Message + i; +connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE, false); +} + +// these should not be received +connectionSub = mqttSub.blockingConnection(); +connectionSub.connect(); +
[10/13] git commit: Fixed AMQ-5160, remove durable subscription in onUnsubscribe()
Fixed AMQ-5160, remove durable subscription in onUnsubscribe() Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/88c6ee97 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/88c6ee97 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/88c6ee97 Branch: refs/heads/trunk Commit: 88c6ee97e0fc8652f2dba16786cb8cbd7b80a1b7 Parents: 8947a09 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Tue May 13 00:29:40 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../transport/mqtt/MQTTProtocolConverter.java | 50 1 file changed, 31 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/88c6ee97/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java -- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index cbb6415..71a6fcf 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -361,12 +361,13 @@ public class MQTTProtocolConverter { ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString())); if( mqttSubscriptionByTopic.containsKey(topicName)) { -if (topicQoS != mqttSubscriptionByTopic.get(topicName).qos()) { +final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName); +if (topicQoS != mqttSubscription.qos()) { // remove old subscription as the QoS has changed onUnSubscribe(topicName); } else { // duplicate SUBSCRIBE packet, find all matching topics and resend retained messages -resendRetainedMessages(topicName, destination); +resendRetainedMessages(topicName, destination, mqttSubscription); return (byte) topicQoS.ordinal(); } @@ -408,7 +409,8 @@ public class MQTTProtocolConverter { return qos[0]; } -private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination) throws MQTTProtocolException { +private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination, +MQTTSubscription mqttSubscription) throws MQTTProtocolException { // get TopicRegion RegionBroker regionBroker; try { @@ -418,25 +420,26 @@ public class MQTTProtocolConverter { } final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion(); +final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); +final ConsumerId consumerId = consumerInfo.getConsumerId(); + +// use actual client id used to create connection to lookup connection context +final String connectionInfoClientId = connectionInfo.getClientId(); +final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfoClientId); + // get all matching Topics final Setorg.apache.activemq.broker.region.Destination matchingDestinations = topicRegion.getDestinations(destination); for (org.apache.activemq.broker.region.Destination dest : matchingDestinations) { -// find matching MQTT subscription for this client -final String mqttTopicName = convertActiveMQToMQTT(dest.getName()); -final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(new UTF8Buffer(mqttTopicName)); -if (mqttSubscription != null) { -// recover retroactive messages for matching subscription -final ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); -final ConsumerId consumerId = consumerInfo.getConsumerId(); -final Subscription subscription = topicRegion.getSubscriptions().get(consumerId); - -// use actual client id used to create connection to lookup connection context -final ConnectionContext connectionContext = regionBroker.getConnectionContext(connectionInfo.getClientId()); -try { - ((org.apache.activemq.broker.region.Topic)dest).recoverRetroactiveMessages(connectionContext, subscription); -} catch (Exception e) { -throw new MQTTProtocolException(Error recovering retained messages for + -mqttTopicName + : + e.getMessage(),
[02/13] git commit: Fixed AMQ-5160, Added RegionBroker.getConnectionContext(), made Topic.recoverRetroactiveMessages() public to force message recovery from MQTTProtocolConverter for duplicate subscri
Fixed AMQ-5160, Added RegionBroker.getConnectionContext(), made Topic.recoverRetroactiveMessages() public to force message recovery from MQTTProtocolConverter for duplicate subscriptions, added new tests for retained messages and JMX interoperability Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a581d010 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a581d010 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a581d010 Branch: refs/heads/trunk Commit: a581d010aa96a5e98e107435a242fc55679042a9 Parents: bcb60a4 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Mon May 12 12:37:54 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:18 2014 +0200 -- .../activemq/broker/region/RegionBroker.java| 4 + .../apache/activemq/broker/region/Topic.java| 2 +- ...tainedMessageSubscriptionRecoveryPolicy.java | 4 +- .../transport/mqtt/MQTTProtocolConverter.java | 56 +-- .../activemq/transport/mqtt/MQTTTest.java | 98 ++-- 5 files changed, 145 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/a581d010/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 46c6de1..da6e1fa 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -215,6 +215,10 @@ public class RegionBroker extends EmptyBroker { return brokerService != null ? brokerService.getDestinationPolicy() : null; } +public ConnectionContext getConnectionContext(String clientId) { +return clientIdSet.get(clientId); +} + @Override public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { String clientId = info.getClientId(); http://git-wip-us.apache.org/repos/asf/activemq/blob/a581d010/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index 4744af8..0186b42 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -305,7 +305,7 @@ public class Topic extends BaseDestination implements Task { sub.remove(context, this, dispatched); } -protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { +public void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception { if (subscription.getConsumerInfo().isRetroactive()) { subscriptionRecoveryPolicy.recover(context, this, subscription); } http://git-wip-us.apache.org/repos/asf/activemq/blob/a581d010/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java index d350a5f..ba2d1a1 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/RetainedMessageSubscriptionRecoveryPolicy.java @@ -37,8 +37,8 @@ import org.apache.activemq.filter.DestinationFilter; */ public class RetainedMessageSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy { -public static final String RETAIN_PROPERTY = ActiveMQRetain; -public static final String RETAINED_PROPERTY = ActiveMQRetained; +public static final String RETAIN_PROPERTY = ActiveMQ.Retain; +public static final String RETAINED_PROPERTY = ActiveMQ.Retained; private volatile MessageReference retainedMessage; private SubscriptionRecoveryPolicy wrapped; http://git-wip-us.apache.org/repos/asf/activemq/blob/a581d010/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
[09/13] git commit: Fixed AMQ-5160, fixed test testNoMessageReceivedAfterUnsubscribeMQTT
Fixed AMQ-5160, fixed test testNoMessageReceivedAfterUnsubscribeMQTT Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/70f7c580 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/70f7c580 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/70f7c580 Branch: refs/heads/trunk Commit: 70f7c5805c0a39034fd1f4f9b76b6e7b293b33ed Parents: 0a39782 Author: Dhiraj Bokde dhira...@yahoo.com Authored: Tue May 13 00:45:05 2014 -0700 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 11:07:19 2014 +0200 -- .../src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java | 2 -- 1 file changed, 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/70f7c580/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java -- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 3c0701e..9c8c9b5 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -1371,8 +1371,6 @@ public class MQTTTest extends AbstractMQTTTest { } // these should not be received -connectionSub = mqttSub.blockingConnection(); -connectionSub.connect(); assertNull(connectionSub.receive(5, TimeUnit.SECONDS)); connectionSub.disconnect();
Jenkins build became unstable: ActiveMQ-Java8 » ActiveMQ :: AMQP #3
See https://builds.apache.org/job/ActiveMQ-Java8/org.apache.activemq$activemq-amqp/3/changes
Jenkins build is still unstable: ActiveMQ-Java8 » ActiveMQ :: MQTT Protocol #3
See https://builds.apache.org/job/ActiveMQ-Java8/org.apache.activemq$activemq-mqtt/3/
Jenkins build is unstable: ActiveMQ-Java8 » ActiveMQ :: Unit Tests #3
See https://builds.apache.org/job/ActiveMQ-Java8/org.apache.activemq$activemq-unit-tests/3/changes
Jenkins build is unstable: ActiveMQ-Java8 #3
See https://builds.apache.org/job/ActiveMQ-Java8/3/changes
git commit: https://issues.apache.org/jira/browse/AMQ-4555 - fix regression that groupClass cannot be set on cachedLDAPAuthorizationMap/
Repository: activemq Updated Branches: refs/heads/trunk 6c859676b - 9d656731a https://issues.apache.org/jira/browse/AMQ-4555 - fix regression that groupClass cannot be set on cachedLDAPAuthorizationMap/ Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/9d656731 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/9d656731 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/9d656731 Branch: refs/heads/trunk Commit: 9d656731ab00280368a212b17c2d538b66d0e91b Parents: 6c85967 Author: Dejan Bosanac de...@nighttale.net Authored: Mon May 26 16:42:38 2014 +0200 Committer: Dejan Bosanac de...@nighttale.net Committed: Mon May 26 16:42:38 2014 +0200 -- .../security/SimpleCachedLDAPAuthorizationMap.java | 12 .../org/apache/activemq/security/activemq-apacheds.xml | 2 +- 2 files changed, 13 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/9d656731/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java b/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java index 2de4eb5..e01d5c0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java +++ b/activemq-broker/src/main/java/org/apache/activemq/security/SimpleCachedLDAPAuthorizationMap.java @@ -90,6 +90,8 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap { private int refreshInterval = -1; private boolean refreshDisabled = false; +protected String groupClass = DefaultAuthorizationMap.DEFAULT_GROUP_CLASS; + // Internal State private long lastUpdated; @@ -255,6 +257,7 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap { // Create and swap in the new instance with updated LDAP data. newMap.setAuthorizationEntries(new ArrayListDestinationMapEntry(entries.values())); +newMap.setGroupClass(groupClass); this.map.set(newMap); updated(); @@ -1107,6 +1110,15 @@ public class SimpleCachedLDAPAuthorizationMap implements AuthorizationMap { this.refreshInterval = refreshInterval; } +public String getGroupClass() { +return groupClass; +} + +public void setGroupClass(String groupClass) { +this.groupClass = groupClass; +map.get().setGroupClass(groupClass); +} + protected static enum DestinationType { QUEUE, TOPIC, TEMP; } http://git-wip-us.apache.org/repos/asf/activemq/blob/9d656731/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml -- diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml index e498ae0..67768c1 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/security/activemq-apacheds.xml @@ -39,7 +39,7 @@ authorizationPlugin map -cachedLDAPAuthorizationMap legacyGroupMapping=false connectionURL=ldap://localhost:${ldapPort}/ +cachedLDAPAuthorizationMap legacyGroupMapping=false connectionURL=ldap://localhost:${ldapPort}; groupClass=org.apache.activemq.jaas.GroupPrincipal/ /map /authorizationPlugin /plugins