[ https://issues.apache.org/jira/browse/AMQ-6851?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gary Tully resolved AMQ-6851. ----------------------------- Resolution: Feedback Received > Messages using Message Groups can arrive out of order when using > CachedMessageGroupMap > -------------------------------------------------------------------------------------- > > Key: AMQ-6851 > URL: https://issues.apache.org/jira/browse/AMQ-6851 > Project: ActiveMQ > Issue Type: Bug > Components: Broker > Affects Versions: 5.12.0, 5.15.2 > Environment: Linux, CentOS 7 > openjdk version "1.8.0_151" > OpenJDK Runtime Environment (build 1.8.0_151-b12) > OpenJDK 64-Bit Server VM (build 25.151-b12, mixed mode) > Reporter: Joshua Montgomery > > The default broker behavior for message groups uses a CachedMessageGroupMap > with a least-recently-used cache with a capacity of 1024. When more that 1024 > group IDs are used messages can be consumed out-of-order. > Scenario. > Configure two consumers for a queue. > Send a message with group ID '0' that requires a long time to consume. > Send 1024 additional messages with group IDs '1' through '1024' that require > a short time to consume. > Send a message of group ID '0' that requires a short time to consume. > Expected: > The second message in group '0' is consumed *after* the first message in > group '0' > Actual: > The second message in group '0' is consumed *before* the first message in > group '0' has finished. > The LRU cache is evicting the group to consumer mapping for group '0' before > the second message arrives, allowing the second message in group '0' to be > processed by a different consumer than the first message. > Using the MessageGroupHashBucket or the SimpleMessageGroupMap results in the > expected behavior. > {code} > package com.example.outoforderjms; > import java.io.Serializable; > import java.time.Instant; > import java.time.ZoneId; > import java.time.format.DateTimeFormatter; > import java.util.Locale; > import javax.jms.ConnectionFactory; > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.pool.PooledConnectionFactory; > import > org.springframework.context.annotation.AnnotationConfigApplicationContext; > import org.springframework.context.annotation.Bean; > import org.springframework.context.annotation.Configuration; > import org.springframework.jms.annotation.EnableJms; > import org.springframework.jms.annotation.JmsListener; > import org.springframework.jms.config.DefaultJmsListenerContainerFactory; > import org.springframework.jms.core.JmsTemplate; > import org.springframework.jms.core.MessagePostProcessor; > @EnableJms > @Configuration > public class OutOfOrderJms { > private static final int MODULUS = 1025; > private static final int COUNT = MODULUS + 1; > private static final String QUEUE_NAME = "MessageGroupTest"; > public static void main(String[] args) { > AnnotationConfigApplicationContext ctx = new > AnnotationConfigApplicationContext(); > ctx.register(OutOfOrderJms.class); > ctx.refresh(); > JmsTemplate template = new JmsTemplate(); > template.setConnectionFactory(CONNECTION_FACTORY); > for (int i = 0; i < COUNT; i++) { > SomeMessage someMessage = new SomeMessage(i, Integer.toString(i % > MODULUS)); > if (someMessage.getGroup().equals("0")) { > System.out.println(getTimeStamp() + " " + > Thread.currentThread().getName() + " producing message " + someMessage); > } > template.convertAndSend(QUEUE_NAME, someMessage, > getMessageGroupPostProcessor(someMessage)); > } > } > private static String getTimeStamp() { > DateTimeFormatter formatter = > DateTimeFormatter.ofPattern("hh:mm:ss:SSSS") > .withLocale(Locale.US) > .withZone(ZoneId.systemDefault()); > return formatter.format(Instant.now()); > } > private static MessagePostProcessor > getMessageGroupPostProcessor(Serializable object) { > return message -> { > SomeMessage m = ((SomeMessage) object); > message.setStringProperty( > "JMSXGroupID", m.getGroup()); > return message; > }; > } > @JmsListener(destination = QUEUE_NAME, containerFactory = > "containerFactory") > private void process(SomeMessage someMessage) throws InterruptedException { > // Simulate long-processing message for first message produced. > if (someMessage.getMessage() == 0) { > for (int i = 10; i > 0; i--) { > Thread.sleep(1000); > System.out.println(i + " "); > } > } > if (someMessage.getGroup().equals("0") || > someMessage.getGroup().equals("1")) { > System.out.println(getTimeStamp() + " " + > Thread.currentThread().getName() + " consuming message " + someMessage); > } > } > @Bean > public DefaultJmsListenerContainerFactory containerFactory() { > DefaultJmsListenerContainerFactory factory = new > DefaultJmsListenerContainerFactory(); > factory.setConnectionFactory(CONNECTION_FACTORY); > factory.setConcurrency("1-2"); > return factory; > } > private static ConnectionFactory CONNECTION_FACTORY = new > PooledConnectionFactory( > new ActiveMQConnectionFactory( > "admin", > "admin", > "failover:tcp://localhost:61616") > ); > private static class SomeMessage implements Serializable { > private final int message; > private final String group; > private SomeMessage(int message, String group) { > this.message = message; > this.group = group; > } > int getMessage() { > return message; > } > String getGroup() { > return group; > } > @Override > public String toString() { > return "SomeMessage{" + > "message=" + message + > ", group='" + group + '\'' + > '}'; > } > } > } > {code} > Output shows message 1025 finishing before message 0 > {code} > 03:11:15:1730 main producing message SomeMessage{message=0, group='0'} > 03:11:15:2220 DefaultMessageListenerContainer-2 consuming message > SomeMessage{message=1, group='1'} > 10 > 9 > 8 > 03:11:18:9530 main producing message SomeMessage{message=1025, group='0'} > 03:11:18:9540 DefaultMessageListenerContainer-2 consuming message > SomeMessage{message=1025, group='0'} > 7 > 6 > 5 > 4 > 3 > 2 > 1 > 03:11:25:2130 DefaultMessageListenerContainer-1 consuming message > SomeMessage{message=0, group='0'} > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)