[ https://issues.apache.org/jira/browse/AMQ-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13614947#comment-13614947 ]
Jason Shepherd edited comment on AMQ-1126 at 3/27/13 5:20 AM: -------------------------------------------------------------- We found that message sequencing is not working correctly when using the above approach. Therefore we suggest that there is a bug still in the Resource Adapter. We tested using: JBoss EAP 6.0.1 ActiveMQ RAR 5.8.0 I have attached a test case (amq_reproducer.zip) which demonstrates the issue in that environment. Here is how it works: amq_reproducer.zip: test_amq.jar test_amq_web.war activemq-rar-5.8.0.rar 1 Changes required in standalone-full.xml for registering activemq resource adapter <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0"> <resource-adapters> <resource-adapter> <archive> activemq-rar-5.8.0.rar </archive> <transaction-support>XATransaction</transaction-support> <config-property name="UseInboundSession"> false </config-property> <config-property name="Password"> defaultPassword </config-property> <config-property name="UserName"> defaultUser </config-property> <config-property name="ServerUrl"> vm://localhost </config-property> <connection-definitions> <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:jboss/ConnectionFactory" enabled="true" pool-name="ConnectionFactory"> <xa-pool> <min-pool-size>1</min-pool-size> <max-pool-size>20</max-pool-size> </xa-pool> </connection-definition> </connection-definitions> <admin-objects> <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/queue/MyActiveMQQueue" use-java-context="true" pool-name="MyActiveMQQueue"> <config-property name="PhysicalName"> QueuePhysicalName </config-property> </admin-object> </admin-objects> </resource-adapter> </resource-adapters> </subsystem> test_amq.jar This jar contains two group of MDBs. Three MDBs (AMQTestMDB*) are consumer for AMQ queue QueuePhysicalName (defined in standalone-ha.xml). The other three MDBs (HornetQTestMDB*) are consumer for HornetQ (default implementation) queue queue/HornetQMDBQueue (defined in war's hornetq-jms.xml). The onMessage method call following simple logic public void handleMessage(String id, TextMessage tm) { try { String logMsg = id + " Testing MDB : " + tm.getText() + ", " + this.toString(); System.out.println(logMsg); if (tm.getText().startsWith("Hello")) { // simulate heavy processing of first message long sec = 5; System.out.println(logMsg + " - Heavy processing for " + sec + " seconds. " + tm.getText() + ", " + this.toString()); Random r = new Random(); for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); stop > System.nanoTime();) { r.nextDouble(); } System.out.println(logMsg + " Done!"); } } catch (JMSException e) { e.printStackTrace(); } } test_amq_web.war This war contains clients for the test queues. /test tests AMQ. /test2 tests HornetQ. Following is the logic used: for (int i = 0; i < 5; i++) { msg.setText("Hello " + i + "[" + groupName + "]"); msg2.setText("World " + i + "[" + groupName + "]"); if (useGroup) { msg.setStringProperty("JMSXGroupID", groupName); msg2.setStringProperty("JMSXGroupID", groupName); } qsender.send(msg); qsender.send(msg2); } Each servlet creates 5 threads. Each thread creates two text messages and sends it out 5 times. Executing test cases - ActiveMQ test: http://localhost:8080/test_amq_web/test - ActiveMQ test without JMSXGroupID: http://localhost:8080/test_amq_web/test?group=false - HornetQ test: http://localhost:8080/test_amq_web/test2 - HornetQ test without JMSXGroupID: http://localhost:8080/test_amq_web/test2?group=false Results: When invoking the test_amq_web/test we can see that messages using the same message group are not being processed sequentially. In the logs we get messages in this order: 15:00:47,031 INFO [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 15:00:47,032 INFO [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 - Heavy processing for 5 seconds. Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 ... 15:00:47,066 INFO [stdout] (default-threads - 12) Consume-AMQ3 Testing MDB : World 0[Group-1], com.redhat.gss.test.HandleMessage@5418f143 ... 15:00:52,035 INFO [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 Done! However when invoking test_amq_web/test2 (which uses HornetQ) we can see that the message are being processed sequentially: 15:08:35,740 INFO [stdout] (Thread-1 (HornetQ-client-global-threads-1277578712)) Consume-HQ1 Testing MDB : Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 15:08:35,744 INFO [stdout] (Thread-1 (HornetQ-client-global-threads-1277578712)) Consume-HQ1 Testing MDB : Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 - Heavy processing for 5 seconds. Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 ... 15:08:40,784 INFO [stdout] (Thread-0 (HornetQ-client-global-threads-1277578712)) Consume-HQ3 Testing MDB : Hello 0[Group-3], com.redhat.gss.test.HandleMessage@bce1791 Done! 15:08:40,788 INFO [stdout] (Thread-0 (HornetQ-client-global-threads-1277578712)) Consume-HQ3 Testing MDB : World 0[Group-3], com.redhat.gss.test.HandleMessage@5e8ace0e was (Author: jshepher): We found that message sequencing is not working correctly when using the above approach. Therefore we suggest that there is a bug still in the Resource Adapter. We tested using: JBoss EAP 6.0.1 ActiveMQ RAR 5.8.0 I have attached a test case (amq_reproducer.zip) which demonstrates the issue in that environment. Here is how it works: amq_reproducer.zip: test_amq.jar test_amq_web.war activemq-rar-5.8.0.rar 1 Changes required in standalone-full.xml for registering activemq resource adapter <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0"> <resource-adapters> <resource-adapter> <archive> activemq-rar-5.8.0.rar </archive> <transaction-support>XATransaction</transaction-support> <config-property name="UseInboundSession"> false </config-property> <config-property name="Password"> defaultPassword </config-property> <config-property name="UserName"> defaultUser </config-property> <config-property name="ServerUrl"> vm://localhost </config-property> <connection-definitions> <connection-definition class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" jndi-name="java:jboss/ConnectionFactory" enabled="true" pool-name="ConnectionFactory"> <xa-pool> <min-pool-size>1</min-pool-size> <max-pool-size>20</max-pool-size> </xa-pool> </connection-definition> </connection-definitions> <admin-objects> <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/queue/MyActiveMQQueue" use-java-context="true" pool-name="MyActiveMQQueue"> <config-property name="PhysicalName"> QueuePhysicalName </config-property> </admin-object> </admin-objects> </resource-adapter> </resource-adapters> </subsystem> test_amq.jar This jar contains two group of MDBs. Three MDBs (AMQTestMDB*) are consumer for AMQ queue QueuePhysicalName (defined in standalone-ha.xml). The other three MDBs (HornetQTestMDB*) are consumer for HornetQ (default implementation) queue queue/HornetQMDBQueue (defined in war's hornetq-jms.xml). The onMessage method call following simple logic public void handleMessage(String id, TextMessage tm) { try { String logMsg = id + " Testing MDB : " + tm.getText() + ", " + this.toString(); System.out.println(logMsg); if (tm.getText().startsWith("Hello")) { // simulate heavy processing of first message long sec = 5; System.out.println(logMsg + " - Heavy processing for " + sec + " seconds. " + tm.getText() + ", " + this.toString()); Random r = new Random(); for (long stop = System.nanoTime() + TimeUnit.SECONDS.toNanos(5); stop > System.nanoTime();) { r.nextDouble(); } System.out.println(logMsg + " Done!"); } } catch (JMSException e) { e.printStackTrace(); } } test_amq_web.war This war contains clients for the test queues. /test tests AMQ. /test2 tests HornetQ. Following is the logic used: for (int i = 0; i < 5; i++) { msg.setText("Hello " + i + "[" + groupName + "]"); msg2.setText("World " + i + "[" + groupName + "]"); if (useGroup) { msg.setStringProperty("JMSXGroupID", groupName); msg2.setStringProperty("JMSXGroupID", groupName); } qsender.send(msg); qsender.send(msg2); } Each servlet creates 5 threads. Each thread creates two text messages and sends it out 5 times. Executing test cases - ActiveMQ test: http://localhost:8080/test_amq_web/test - ActiveMQ test without JMSXGroupID: http://localhost:8080/test_amq_web/test?group=false - HornetQ test: http://localhost:8080/test_amq_web/test2 - HornetQ test without JMSXGroupID: http://localhost:8080/test_amq_web/test2?group=false Results: When invoking the test_amq_web/test we can see that messages using the same message group are not being processed sequentially. In the logs we get messages in this order: 15:00:47,031 INFO [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 15:00:47,032 INFO [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 - Heavy processing for 5 seconds. Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 ... 15:00:47,066 INFO [stdout] (default-threads - 12) Consume-AMQ3 Testing MDB : World 0[Group-1], com.redhat.gss.test.HandleMessage@5418f143 ... 15:00:52,035 INFO [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 Done! However when invoking test_amq_web/test2 (which uses HornetQ) we can see that the message are being processed sequentially: 15:08:35,740 INFO [stdout] (Thread-1 (HornetQ-client-global-threads-1277578712)) Consume-HQ1 Testing MDB : Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 15:08:35,744 INFO [stdout] (Thread-1 (HornetQ-client-global-threads-1277578712)) Consume-HQ1 Testing MDB : Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 - Heavy processing for 5 seconds. Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 ... 15:08:40,784 INFO [stdout] (Thread-0 (HornetQ-client-global-threads-1277578712)) Consume-HQ3 Testing MDB : Hello 0[Group-3], com.redhat.gss.test.HandleMessage@bce1791 Done! 15:08:40,788 INFO [stdout] (Thread-0 (HornetQ-client-global-threads-1277578712)) Consume-HQ3 Testing MDB : World 0[Group-3], com.redhat.gss.test.HandleMessage@5e8ace0e > The Resource Adapter ignores the JMSXGroupID when dispatching to MDBs > --------------------------------------------------------------------- > > Key: AMQ-1126 > URL: https://issues.apache.org/jira/browse/AMQ-1126 > Project: ActiveMQ > Issue Type: Bug > Components: JCA Container > Affects Versions: 4.0.1 > Environment: Java 1.4.2_08 > JBoss 4.0.4 > ActiveMQ 4.0.1 > Reporter: John Robinson > Fix For: NEEDS_REVIEWED > > Attachments: amq_reproducer.zip, msg-group-test.zip > > > Integrate AMQ into JBoss using the data source, and resource adapter. Create > an outbound queue and an MDB with a pool size of 100. Dispatch several > messages to the outbound queue, setting the JMSXGroupID property on the > message to be the same value each time. In the MDB's onMessage method print > out the MDBs toString (don't override toString) and you should see something > that looks like: > OutQueueProcessorBean@19a7266 > Observe two things: > a) Many messages are processed in parallel > b) Many different values will occur after the @ in the above message, > denoting that more than on MDB instance is being handed messages. > The correct behavior would be to dispatch messages with the same group id to > the same MDB instance in sequence. This would allow messages from different > groups to be processed in parallel, but messages in any one group would be > processed serially, in the order in which they were placed into the queue. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira