[ 
https://issues.apache.org/jira/browse/AMQ-3607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Furness updated AMQ-3607:
-------------------------------

    Attachment: ActiveMQSlowConsumerManualTest.java

Test case attached as requested
                
> Setting OptimiseAcknowledge on a queue with a prefetch limit causes 
> normal/fast consumers to miss messages when a slow consumer is blocking
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3607
>                 URL: https://issues.apache.org/jira/browse/AMQ-3607
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.5.0
>         Environment: Java: 1.6.0_26-b03-383.jdk
>            Reporter: James Furness
>         Attachments: ActiveMQSlowConsumerManualTest.java
>
>
> The below test case tests slow consumer handling with a variety of topic 
> policies and SessionFactory/ConnectionFactory settings. The expectation is 
> that a normal (i.e. fast) consumer will continue to receive messages whilst a 
> slow consumer is blocking.
> Without a prefetch limit, the expected behaviour is seen with 
> setOptimizeAcknowledge both true and false.
> If a prefetch limit is set, setOptimizeAcknowledge(true) causes the 
> normal/fast consumer to miss messages whilst the slow consumer is blocking.
> Would be nice to be able to turn on OptimiseAcknowledge for performance 
> reasons, however it is also necessary to set the prefetch limit in order to 
> trigger SlowConsumerStrategy/MessageEvictionStrategySupport logic.
> {code:title=testDefaultSettings}
> Publisher: Send 0
> SlowConsumer: Receive 0
> FastConsumer: Receive 0
> testDefaultSettings: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettings: Whilst slow consumer blocked:
>               - SlowConsumer Received: 1 [0]
>               - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettings: After slow consumer unblocked:
>               - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
>               - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testDefaultSettingsWithOptimiseAcknowledge}
> testDefaultSettingsWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 
> 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 
> 25, 26, 27, 28, 29]
> testDefaultSettingsWithOptimiseAcknowledge: Whilst slow consumer blocked:
>               - SlowConsumer Received: 1 [0]
>               - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testDefaultSettingsWithOptimiseAcknowledge: After slow consumer unblocked:
>               - SlowConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
>               - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testBounded}
> testBounded: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 
> 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testBounded: Whilst slow consumer blocked:
>               - SlowConsumer Received: 1 [0]
>               - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> testBounded: After slow consumer unblocked:
>               - SlowConsumer Received: 10 [0, 1, 2, 3, 4, 25, 26, 27, 28, 29]
>               - FastConsumer Received: 30 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 
> 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
> {code}
> {code:title=testBoundedWithOptimiseAcknowledge}
> testBoundedWithOptimiseAcknowledge: Publisher Sent: 30 [0, 1, 2, 3, 4, 5, 6, 
> 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 
> 27, 28, 29]
> testBoundedWithOptimiseAcknowledge: Whilst slow consumer blocked:
>               - SlowConsumer Received: 1 [0]
>               - FastConsumer Received: 5 [0, 1, 2, 3, 4]
> testBoundedWithOptimiseAcknowledge: After slow consumer unblocked:
>               - SlowConsumer Received: 5 [0, 1, 2, 3, 4]
>               - FastConsumer Received: 5 [0, 1, 2, 3, 4]
> java.lang.AssertionError: Fast consumer missed messages whilst slow consumer 
> was blocking expected:<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 
> 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]> but was:<[0, 1, 2, 
> 3, 4]>
> {code}
> {code:title=ActiveMQSlowConsumerManualTest.java}
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import 
> org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
> import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
> import org.apache.activemq.broker.region.policy.PolicyEntry;
> import org.apache.activemq.broker.region.policy.PolicyMap;
> import org.apache.activemq.command.ActiveMQTopic;
> import org.junit.Assert;
> import org.junit.Ignore;
> import org.junit.Test;
> import javax.jms.Connection;
> import javax.jms.DeliveryMode;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.concurrent.CountDownLatch;
> import java.util.concurrent.atomic.AtomicInteger;
> /**
>  * @author James Furness
>  */
> public class ActiveMQSlowConsumerManualTest {
>     private static final int PORT = 12345;
>     private static final ActiveMQTopic TOPIC = new ActiveMQTopic("TOPIC");
>     private static final String URL = "nio://localhost:" + PORT + 
> "?socket.tcpNoDelay=true";
>     @Test(timeout = 60000)
>     public void testDefaultSettings() throws Exception {
>         runTest("testDefaultSettings", 30, -1, -1, false, false, false, 
> false);
>     }
>     @Test(timeout = 60000)
>     public void testDefaultSettingsWithOptimiseAcknowledge() throws Exception 
> {
>         runTest("testDefaultSettingsWithOptimiseAcknowledge", 30, -1, -1, 
> false, false, true, false);
>     }
>     @Test(timeout = 60000)
>     public void testBounded() throws Exception {
>         runTest("testBounded", 30, 5, 5, false, false, false, false);
>     }
>     @Test(timeout = 60000)
>     public void testBoundedWithOptimiseAcknowledge() throws Exception {
>         runTest("testBoundedWithOptimiseAcknowledge", 30, 5, 5, false, false, 
> true, false);
>     }
>     public void runTest(String name, int sendMessageCount, int prefetchLimit, 
> int messageLimit, boolean evictOldestMessage, boolean disableFlowControl, 
> boolean optimizeAcknowledge, boolean persistent) throws Exception {
>         BrokerService broker = createBroker(persistent);
>         broker.setDestinationPolicy(buildPolicy(TOPIC, prefetchLimit, 
> messageLimit, evictOldestMessage, disableFlowControl));
>         broker.start();
>         // Slow consumer
>         Session slowConsumerSession = buildSession("SlowConsumer", URL, 
> optimizeAcknowledge);
>         final CountDownLatch blockSlowConsumer = new CountDownLatch(1);
>         final AtomicInteger slowConsumerReceiveCount = new AtomicInteger();
>         final List<Integer> slowConsumerReceived = sendMessageCount <= 1000 ? 
> new ArrayList<Integer>() : null;
>         MessageConsumer slowConsumer = createSubscriber(slowConsumerSession,
>                 new MessageListener() {
>                     @Override
>                     public void onMessage(Message message) {
>                         try {
>                             slowConsumerReceiveCount.incrementAndGet();
>                             int count = Integer.parseInt(((TextMessage) 
> message).getText());
>                             if (slowConsumerReceived != null) 
> slowConsumerReceived.add(count);
>                             if (count % 10000 == 0) 
> System.out.println("SlowConsumer: Receive " + count);
>                             blockSlowConsumer.await();
>                         } catch (Exception ignored) {}
>                     }
>                 }
>         );
>         // Fast consumer
>         Session fastConsumerSession = buildSession("FastConsumer", URL, 
> optimizeAcknowledge);
>         final AtomicInteger fastConsumerReceiveCount = new AtomicInteger();
>         final List<Integer> fastConsumerReceived = sendMessageCount <= 1000 ? 
> new ArrayList<Integer>() : null;
>         MessageConsumer fastConsumer = createSubscriber(fastConsumerSession,
>                 new MessageListener() {
>                     @Override
>                     public void onMessage(Message message) {
>                         try {
>                             fastConsumerReceiveCount.incrementAndGet();
>                             int count = Integer.parseInt(((TextMessage) 
> message).getText());
>                             if (fastConsumerReceived != null) 
> fastConsumerReceived.add(count);
>                             if (count % 10000 == 0) 
> System.out.println("FastConsumer: Receive " + count);
>                         } catch (Exception ignored) {}
>                     }
>                 }
>         );
>         // Wait for consumers to connect
>         Thread.sleep(500);
>         // Publisher
>         AtomicInteger sentCount = new AtomicInteger();
>         List<Integer> sent = sendMessageCount <= 1000 ? new 
> ArrayList<Integer>() : null;
>         Session publisherSession = buildSession("Publisher", URL, 
> optimizeAcknowledge);
>         MessageProducer publisher = createPublisher(publisherSession);
>         for (int i = 0; i < sendMessageCount; i++) {
>             sentCount.incrementAndGet();
>             if (sent != null) sent.add(i);
>             if (i % 10000 == 0) System.out.println("Publisher: Send " + i);
>             
> publisher.send(publisherSession.createTextMessage(Integer.toString(i)));
>         }
>         // Wait for messages to arrive
>         Thread.sleep(500);
>         System.out.println(name + ": Publisher Sent: " + sentCount + " " + 
> sent);
>         System.out.println(name + ": Whilst slow consumer blocked:");
>         System.out.println("\t\t- SlowConsumer Received: " + 
> slowConsumerReceiveCount + " " + slowConsumerReceived);
>         System.out.println("\t\t- FastConsumer Received: " + 
> fastConsumerReceiveCount + " " + fastConsumerReceived);
>         // Unblock slow consumer
>         blockSlowConsumer.countDown();
>         // Wait for messages to arrive
>         Thread.sleep(500);
>         System.out.println(name + ": After slow consumer unblocked:");
>         System.out.println("\t\t- SlowConsumer Received: " + 
> slowConsumerReceiveCount + " " + slowConsumerReceived);
>         System.out.println("\t\t- FastConsumer Received: " + 
> fastConsumerReceiveCount + " " + fastConsumerReceived);
>         System.out.println();
>         publisher.close();
>         publisherSession.close();
>         slowConsumer.close();
>         slowConsumerSession.close();
>         fastConsumer.close();
>         fastConsumerSession.close();
>         broker.stop();
>         Assert.assertEquals("Fast consumer missed messages whilst slow 
> consumer was blocking", sent, fastConsumerReceived);
>         Assert.assertEquals("Slow consumer received incorrect message count", 
> Math.min(sendMessageCount, prefetchLimit + (messageLimit > 0 ? messageLimit : 
> Integer.MAX_VALUE)), slowConsumerReceived.size());
>     }
>     private static BrokerService createBroker(boolean persistent) throws 
> Exception {
>         BrokerService broker = new BrokerService();
>         broker.setBrokerName("TestBroker");
>         broker.setPersistent(persistent);
>         broker.addConnector(URL);
>         return broker;
>     }
>     private static MessageConsumer createSubscriber(Session session, 
> MessageListener messageListener) throws JMSException {
>         MessageConsumer consumer = session.createConsumer(TOPIC);
>         consumer.setMessageListener(messageListener);
>         return consumer;
>     }
>     private static MessageProducer createPublisher(Session session) throws 
> JMSException {
>         MessageProducer producer = session.createProducer(TOPIC);
>         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>         return producer;
>     }
>     private static Session buildSession(String clientId, String url, boolean 
> optimizeAcknowledge) throws JMSException {
>         ActiveMQConnectionFactory connectionFactory = new 
> ActiveMQConnectionFactory(url);
>         connectionFactory.setCopyMessageOnSend(false);
>         connectionFactory.setDisableTimeStampsByDefault(true);
>         connectionFactory.setOptimizeAcknowledge(optimizeAcknowledge);
>         Connection connection = connectionFactory.createConnection();
>         connection.setClientID(clientId);
>         Session session = connection.createSession(false, 
> Session.AUTO_ACKNOWLEDGE);
>         connection.start();
>         return session;
>     }
>     private static PolicyMap buildPolicy(ActiveMQTopic topic, int 
> prefetchLimit, int messageLimit, boolean evictOldestMessage, boolean 
> disableFlowControl) {
>         PolicyMap policyMap = new PolicyMap();
>         PolicyEntry policyEntry = new PolicyEntry();
>         if (evictOldestMessage) {
>             policyEntry.setMessageEvictionStrategy(new 
> OldestMessageEvictionStrategy());
>         }
>         if (disableFlowControl) {
>             policyEntry.setProducerFlowControl(false);
>         }
>         if (prefetchLimit > 0) {
>             policyEntry.setTopicPrefetch(prefetchLimit);
>         }
>         if (messageLimit > 0) {
>             ConstantPendingMessageLimitStrategy messageLimitStrategy = new 
> ConstantPendingMessageLimitStrategy();
>             messageLimitStrategy.setLimit(messageLimit);
>             policyEntry.setPendingMessageLimitStrategy(messageLimitStrategy);
>         }
>         policyMap.put(topic, policyEntry);
>         return policyMap;
>     }
> }
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to