[ https://issues.apache.org/jira/browse/AMQ-3607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
James Furness updated AMQ-3607: ------------------------------- Attachment: ActiveMQSlowConsumerManualTest.java Test case attached as requested > Setting OptimiseAcknowledge on a queue with a prefetch limit causes > normal/fast consumers to miss messages when a slow consumer is blocking > ------------------------------------------------------------------------------------------------------------------------------------------- > > Key: AMQ-3607 > URL: https://issues.apache.org/jira/browse/AMQ-3607 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.5.0 > Environment: Java: 1.6.0_26-b03-383.jdk > Reporter: James Furness > Attachments: ActiveMQSlowConsumerManualTest.java > > > The below test case tests slow consumer handling with a variety of topic > policies and SessionFactory/ConnectionFactory settings. The expectation is > that a normal (i.e. fast) consumer will continue to receive messages whilst a > slow consumer is blocking. > Without a prefetch limit, the expected behaviour is seen with > setOptimizeAcknowledge both true and false. > If a prefetch limit is set, setOptimizeAcknowledge(true) causes the > normal/fast consumer to miss messages whilst the slow consumer is blocking. > Would be nice to be able to turn on OptimiseAcknowledge for performance > reasons, however it is also necessary to set the prefetch limit in order to > trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic. > {code:title=testDefaultSettings} > Publisher: Send 0 > SlowConsumer: Receive 0 > FastConsumer: Receive 0 > testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > testDefaultSettings: Whilst slow consumer blocked: > - SlowConsumer Received: 1 [0] > - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > testDefaultSettings: After slow consumer unblocked: > - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > {code} > {code:title=testDefaultSettingsWithOptimiseAcknowledge} > testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, > 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, > 25, 26, 27, 28, 29] > testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked: > - SlowConsumer Received: 1 [0] > - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked: > - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > {code} > {code:title=testBounded} > testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, > 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > testBounded: Whilst slow consumer blocked: > - SlowConsumer Received: 1 [0] > - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > testBounded: After slow consumer unblocked: > - SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29] > - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, > 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29] > {code} > {code:title=testBoundedWithOptimiseAcknowledge} > testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, > 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, > 27, 28, 29] > testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked: > - SlowConsumer Received: 1 [0] > - FastConsumer Received: 5 [0, 1, 2, 3, 4] > testBoundedWithOptimiseAcknowledge: After slow consumer unblocked: > - SlowConsumer Received: 5 [0, 1, 2, 3, 4] > - FastConsumer Received: 5 [0, 1, 2, 3, 4] > java.lang.AssertionError: Fast consumer missed messages whilst slow consumer > was blocking expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2, > 3, 4]> > {code} > {code:title=ActiveMQSlowConsumerManualTest.java} > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.broker.BrokerService; > import > org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy; > import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy; > import org.apache.activemq.broker.region.policy.PolicyEntry; > import org.apache.activemq.broker.region.policy.PolicyMap; > import org.apache.activemq.command.ActiveMQTopic; > import org.junit.Assert; > import org.junit.Ignore; > import org.junit.Test; > import javax.jms.Connection; > import javax.jms.DeliveryMode; > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageListener; > import javax.jms.MessageProducer; > import javax.jms.Session; > import javax.jms.TextMessage; > import java.util.ArrayList; > import java.util.List; > import java.util.concurrent.CountDownLatch; > import java.util.concurrent.atomic.AtomicInteger; > /** > * @author James Furness > */ > public class ActiveMQSlowConsumerManualTest { > private static final int PORT = 12345; > private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC"); > private static final String URL = "nio://localhost:" + PORT + > "?socket.tcpNoDelay=true"; > @Test(timeout = 60000) > public void testDefaultSettings() throws Exception { > runTest("testDefaultSettings", 30, -1, -1, false, false, false, > false); > } > @Test(timeout = 60000) > public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception > { > runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, > false, false, true, false); > } > @Test(timeout = 60000) > public void testBounded() throws Exception { > runTest("testBounded", 30, 5, 5, false, false, false, false); > } > @Test(timeout = 60000) > public void testBoundedWithOptimiseAcknowledge() throws Exception { > runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false, > true, false); > } > public void runTest(String name, int sendMessageCount, int prefetchLimit, > int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, > boolean optimizeAcknowledge, boolean persistent) throws Exception { > BrokerService broker = createBroker(persistent); > broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, > messageLimit, evictOldestMessage, disableFlowControl)); > broker.start(); > // Slow consumer > Session slowConsumerSession = buildSession("SlowConsumer", URL, > optimizeAcknowledge); > final CountDownLatch blockSlowConsumer = new CountDownLatch(1); > final AtomicInteger slowConsumerReceiveCount = new AtomicInteger(); > final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? > new ArrayList<Integer>() : null; > MessageConsumer slowConsumer = createSubscriber(slowConsumerSession, > new MessageListener() { > @Override > public void onMessage(Message message) { > try { > slowConsumerReceiveCount.incrementAndGet(); > int count = Integer.parseInt(((TextMessage) > message).getText()); > if (slowConsumerReceived != null) > slowConsumerReceived.add(count); > if (count % 10000 == 0) > System.out.println("SlowConsumer: Receive " + count); > blockSlowConsumer.await(); > } catch (Exception ignored) {} > } > } > ); > // Fast consumer > Session fastConsumerSession = buildSession("FastConsumer", URL, > optimizeAcknowledge); > final AtomicInteger fastConsumerReceiveCount = new AtomicInteger(); > final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? > new ArrayList<Integer>() : null; > MessageConsumer fastConsumer = createSubscriber(fastConsumerSession, > new MessageListener() { > @Override > public void onMessage(Message message) { > try { > fastConsumerReceiveCount.incrementAndGet(); > int count = Integer.parseInt(((TextMessage) > message).getText()); > if (fastConsumerReceived != null) > fastConsumerReceived.add(count); > if (count % 10000 == 0) > System.out.println("FastConsumer: Receive " + count); > } catch (Exception ignored) {} > } > } > ); > // Wait for consumers to connect > Thread.sleep(500); > // Publisher > AtomicInteger sentCount = new AtomicInteger(); > List<Integer> sent = sendMessageCount <= 1000 ? new > ArrayList<Integer>() : null; > Session publisherSession = buildSession("Publisher", URL, > optimizeAcknowledge); > MessageProducer publisher = createPublisher(publisherSession); > for (int i = 0; i < sendMessageCount; i++) { > sentCount.incrementAndGet(); > if (sent != null) sent.add(i); > if (i % 10000 == 0) System.out.println("Publisher: Send " + i); > > publisher.send(publisherSession.createTextMessage(Integer.toString(i))); > } > // Wait for messages to arrive > Thread.sleep(500); > System.out.println(name + ": Publisher Sent: " + sentCount + " " + > sent); > System.out.println(name + ": Whilst slow consumer blocked:"); > System.out.println("\t\t- SlowConsumer Received: " + > slowConsumerReceiveCount + " " + slowConsumerReceived); > System.out.println("\t\t- FastConsumer Received: " + > fastConsumerReceiveCount + " " + fastConsumerReceived); > // Unblock slow consumer > blockSlowConsumer.countDown(); > // Wait for messages to arrive > Thread.sleep(500); > System.out.println(name + ": After slow consumer unblocked:"); > System.out.println("\t\t- SlowConsumer Received: " + > slowConsumerReceiveCount + " " + slowConsumerReceived); > System.out.println("\t\t- FastConsumer Received: " + > fastConsumerReceiveCount + " " + fastConsumerReceived); > System.out.println(); > publisher.close(); > publisherSession.close(); > slowConsumer.close(); > slowConsumerSession.close(); > fastConsumer.close(); > fastConsumerSession.close(); > broker.stop(); > Assert.assertEquals("Fast consumer missed messages whilst slow > consumer was blocking", sent, fastConsumerReceived); > Assert.assertEquals("Slow consumer received incorrect message count", > Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : > Integer.MAX_VALUE)), slowConsumerReceived.size()); > } > private static BrokerService createBroker(boolean persistent) throws > Exception { > BrokerService broker = new BrokerService(); > broker.setBrokerName("TestBroker"); > broker.setPersistent(persistent); > broker.addConnector(URL); > return broker; > } > private static MessageConsumer createSubscriber(Session session, > MessageListener messageListener) throws JMSException { > MessageConsumer consumer = session.createConsumer(TOPIC); > consumer.setMessageListener(messageListener); > return consumer; > } > private static MessageProducer createPublisher(Session session) throws > JMSException { > MessageProducer producer = session.createProducer(TOPIC); > producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); > return producer; > } > private static Session buildSession(String clientId, String url, boolean > optimizeAcknowledge) throws JMSException { > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(url); > connectionFactory.setCopyMessageOnSend(false); > connectionFactory.setDisableTimeStampsByDefault(true); > connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge); > Connection connection = connectionFactory.createConnection(); > connection.setClientID(clientId); > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > connection.start(); > return session; > } > private static PolicyMap buildPolicy(ActiveMQTopic topic, int > prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean > disableFlowControl) { > PolicyMap policyMap = new PolicyMap(); > PolicyEntry policyEntry = new PolicyEntry(); > if (evictOldestMessage) { > policyEntry.setMessageEvictionStrategy(new > OldestMessageEvictionStrategy()); > } > if (disableFlowControl) { > policyEntry.setProducerFlowControl(false); > } > if (prefetchLimit > 0) { > policyEntry.setTopicPrefetch(prefetchLimit); > } > if (messageLimit > 0) { > ConstantPendingMessageLimitStrategy messageLimitStrategy = new > ConstantPendingMessageLimitStrategy(); > messageLimitStrategy.setLimit(messageLimit); > policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy); > } > policyMap.put(topic, policyEntry); > return policyMap; > } > } > {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira