Repository: activemq Updated Branches: refs/heads/master 5c035fac2 -> d0c95146c
https://issues.apache.org/jira/browse/AMQ-6579 Remove incorrect dispatch counter increment on message expiration in TopicSubscription when a pending limit strategy is set Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/d0c95146 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/d0c95146 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/d0c95146 Branch: refs/heads/master Commit: d0c95146c37f96ca69e1fe82c4b2fe9208f8184e Parents: 5c035fa Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Wed Jan 25 11:16:43 2017 -0500 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Wed Jan 25 11:16:43 2017 -0500 ---------------------------------------------------------------------- .../broker/region/TopicSubscription.java | 1 - .../org/apache/activemq/bugs/AMQ6579Test.java | 128 +++++++++++++++++++ 2 files changed, 128 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/d0c95146/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index d993333..095735a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -232,7 +232,6 @@ public class TopicSubscription extends AbstractSubscription { node.decrementReferenceCount(); if (node.isExpired()) { matched.remove(); - getSubscriptionStatistics().getDispatched().increment(); node.decrementReferenceCount(); if (broker.isExpired(node)) { ((Destination) node.getRegionDestination()).getDestinationStatistics().getExpired().increment(); http://git-wip-us.apache.org/repos/asf/activemq/blob/d0c95146/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6579Test.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6579Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6579Test.java new file mode 100644 index 0000000..cc45591 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6579Test.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ6579Test { + + protected BrokerService brokerService; + protected Connection connection; + protected Session session; + protected ActiveMQTopic topic; + protected Destination amqDestination; + protected MessageConsumer consumer; + + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + + PolicyEntry policy = new PolicyEntry(); + policy.setTopicPrefetch(5); + ConstantPendingMessageLimitStrategy pendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy(); + pendingMessageLimitStrategy.setLimit(5); + policy.setPendingMessageLimitStrategy(pendingMessageLimitStrategy); + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + brokerService.setDestinationPolicy(pMap); + + TransportConnector tcp = brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + connection = new ActiveMQConnectionFactory(tcp.getPublishableConnectString()).createConnection(); + connection.start(); + session = connection.createSession(false, ActiveMQSession.AUTO_ACKNOWLEDGE); + topic = new ActiveMQTopic("test.topic"); + consumer = session.createConsumer(topic); + amqDestination = TestSupport.getDestination(brokerService, topic); + } + + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + brokerService.stop(); + } + + /** + * Test that messages are expired properly on a topic subscription when a + * constant pending limit strategy is set and that future messages are + * dispatched properly so that the consumer isn't blocked + * + * @throws Exception + */ + @Test(timeout = 10000) + public void testExpireWithPendingLimitStrategy() throws Exception { + + //Send 5 messages that are not expired to fill up prefetch + //followed by 5 messages that can be expired + //then another 5 messages that won't expire + //Make sure 10 messages are received + sendMessages(5, 0); + sendMessages(5, 1); + sendMessages(5, 0); + + //should get 10 messages as the middle 5 should expire + assertEquals(10, receiveMessages()); + } + + /** + * This method will generate random sized messages up to 150000 bytes. + * + * @param count + * @throws JMSException + */ + protected void sendMessages(int count, int expire) throws JMSException { + MessageProducer producer = session.createProducer(topic); + producer.setTimeToLive(expire); + for (int i = 0; i < count; i++) { + TextMessage textMessage = session.createTextMessage("test"); + producer.send(textMessage); + } + } + + protected int receiveMessages() throws JMSException { + int count = 0; + while (consumer.receive(500) != null) { + count++; + } + return count; + } + +}