[ 
https://issues.apache.org/jira/browse/AMQ-1126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13614947#comment-13614947
 ] 

Jason Shepherd edited comment on AMQ-1126 at 3/27/13 6:41 AM:
--------------------------------------------------------------

We found that message sequencing is not working correctly when using the above 
approach. Therefore we suggest that there is a bug still in the Resource 
Adapter.

We tested using:
JBoss EAP 6.0.1
ActiveMQ RAR 5.8.0

I have attached a test case (amq_reproducer.zip) which demonstrates the issue 
in that environment. Here is how it works:


    amq_reproducer.zip:
        test_amq.jar
        test_amq_web.war
    activemq-rar-5.8.0.rar 1
    Changes required in standalone-full.xml for registering activemq resource 
adapter

{code:xml}

 <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0">
            <resource-adapters>
                <resource-adapter>
                    <archive>
                        activemq-rar-5.8.0.rar
                    </archive>
                    <transaction-support>XATransaction</transaction-support>
                    <config-property name="UseInboundSession">
                        false
                    </config-property>
                    <config-property name="Password">
                        defaultPassword
                    </config-property>
                    <config-property name="UserName">
                        defaultUser
                    </config-property>
                    <config-property name="ServerUrl">
                        vm://localhost
                    </config-property>
                    <connection-definitions>
                        <connection-definition 
class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" 
jndi-name="java:jboss/ConnectionFactory" enabled="true" 
pool-name="ConnectionFactory">
                            <xa-pool>
                                <min-pool-size>1</min-pool-size>
                                <max-pool-size>20</max-pool-size>
                            </xa-pool>
                        </connection-definition>
                    </connection-definitions>
                    <admin-objects>
                        <admin-object 
class-name="org.apache.activemq.command.ActiveMQQueue" 
jndi-name="java:jboss/queue/MyActiveMQQueue" use-java-context="true" 
pool-name="MyActiveMQQueue">
                            <config-property name="PhysicalName">
                                QueuePhysicalName
                            </config-property>
                        </admin-object>
                    </admin-objects>
                </resource-adapter>
            </resource-adapters>
        </subsystem>
{code}
test_amq.jar
This jar contains two group of MDBs. Three MDBs (AMQTestMDB*) are consumer for 
AMQ queue QueuePhysicalName (defined in standalone-ha.xml). The other three 
MDBs (HornetQTestMDB*) are consumer for HornetQ (default implementation) queue 
queue/HornetQMDBQueue (defined in war's hornetq-jms.xml).

The onMessage method call following simple logic
{code:borderStyle=solid}

public void handleMessage(String id, TextMessage tm) {
        try {
            String logMsg = id + " Testing MDB : " + tm.getText()
                    + ", " + this.toString();
            System.out.println(logMsg);
            if (tm.getText().startsWith("Hello")) {
                // simulate heavy processing of first message
                long sec = 5;
                System.out.println(logMsg + " - Heavy processing for " + sec
                        + " seconds. " + tm.getText() + ", " + this.toString());
                Random r = new Random();
                for (long stop = System.nanoTime()
                        + TimeUnit.SECONDS.toNanos(5); stop > 
System.nanoTime();) {
                    r.nextDouble();
                }
                System.out.println(logMsg + " Done!");
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
{code}
test_amq_web.war
This war contains clients for the test queues. /test tests AMQ. /test2 tests 
HornetQ. Following is the logic used:
{code:borderStyle=solid}
    for (int i = 0; i < 5; i++) {
        msg.setText("Hello " + i + "[" + groupName + "]");
        msg2.setText("World " + i + "[" + groupName + "]");

        if (useGroup) {
            msg.setStringProperty("JMSXGroupID", groupName);
            msg2.setStringProperty("JMSXGroupID", groupName);
        }

        qsender.send(msg);
        qsender.send(msg2);
    }
{code}
Each servlet creates 5 threads. Each thread creates two text messages and sends 
it out 5 times.

Executing test cases
- ActiveMQ test: http://localhost:8080/test_amq_web/test
- ActiveMQ test without JMSXGroupID: 
http://localhost:8080/test_amq_web/test?group=false
- HornetQ test: http://localhost:8080/test_amq_web/test2
- HornetQ test without JMSXGroupID: 
http://localhost:8080/test_amq_web/test2?group=false

Results:

When invoking the test_amq_web/test we can see that messages using the same 
message group are not being processed sequentially. In the logs we get messages 
in this order:
{code}
15:00:47,031 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : 
Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730
15:00:47,032 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : 
Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 - Heavy processing 
for 5 seconds. Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730
...
15:00:47,066 INFO  [stdout] (default-threads - 12) Consume-AMQ3 Testing MDB : 
World 0[Group-1], com.redhat.gss.test.HandleMessage@5418f143
...
15:00:52,035 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : 
Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 Done!
{code}
However when invoking test_amq_web/test2 (which uses HornetQ) we can see that 
the message are being processed sequentially:
{code}
15:08:35,740 INFO  [stdout] (Thread-1 
(HornetQ-client-global-threads-1277578712)) Consume-HQ1 Testing MDB : Hello 
0[Group-0], com.redhat.gss.test.HandleMessage@67092e88
15:08:35,744 INFO  [stdout] (Thread-1 
(HornetQ-client-global-threads-1277578712)) Consume-HQ1 Testing MDB : Hello 
0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 - Heavy processing for 5 
seconds. Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88
...
15:08:40,784 INFO  [stdout] (Thread-0 
(HornetQ-client-global-threads-1277578712)) Consume-HQ3 Testing MDB : Hello 
0[Group-3], com.redhat.gss.test.HandleMessage@bce1791 Done!
15:08:40,788 INFO  [stdout] (Thread-0 
(HornetQ-client-global-threads-1277578712)) Consume-HQ3 Testing MDB : World 
0[Group-3], com.redhat.gss.test.HandleMessage@5e8ace0e
{code}
                
      was (Author: jshepher):
    We found that message sequencing is not working correctly when using the 
above approach. Therefore we suggest that there is a bug still in the Resource 
Adapter.

We tested using:
JBoss EAP 6.0.1
ActiveMQ RAR 5.8.0

I have attached a test case (amq_reproducer.zip) which demonstrates the issue 
in that environment. Here is how it works:


    amq_reproducer.zip:
        test_amq.jar
        test_amq_web.war
    activemq-rar-5.8.0.rar 1
    Changes required in standalone-full.xml for registering activemq resource 
adapter

{code:xml}

 <subsystem xmlns="urn:jboss:domain:resource-adapters:1.0">
            <resource-adapters>
                <resource-adapter>
                    <archive>
                        activemq-rar-5.8.0.rar
                    </archive>
                    <transaction-support>XATransaction</transaction-support>
                    <config-property name="UseInboundSession">
                        false
                    </config-property>
                    <config-property name="Password">
                        defaultPassword
                    </config-property>
                    <config-property name="UserName">
                        defaultUser
                    </config-property>
                    <config-property name="ServerUrl">
                        vm://localhost
                    </config-property>
                    <connection-definitions>
                        <connection-definition 
class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" 
jndi-name="java:jboss/ConnectionFactory" enabled="true" 
pool-name="ConnectionFactory">
                            <xa-pool>
                                <min-pool-size>1</min-pool-size>
                                <max-pool-size>20</max-pool-size>
                            </xa-pool>
                        </connection-definition>
                    </connection-definitions>
                    <admin-objects>
                        <admin-object 
class-name="org.apache.activemq.command.ActiveMQQueue" 
jndi-name="java:jboss/queue/MyActiveMQQueue" use-java-context="true" 
pool-name="MyActiveMQQueue">
                            <config-property name="PhysicalName">
                                QueuePhysicalName
                            </config-property>
                        </admin-object>
                    </admin-objects>
                </resource-adapter>
            </resource-adapters>
        </subsystem>
{code}
test_amq.jar
This jar contains two group of MDBs. Three MDBs (AMQTestMDB*) are consumer for 
AMQ queue QueuePhysicalName (defined in standalone-ha.xml). The other three 
MDBs (HornetQTestMDB*) are consumer for HornetQ (default implementation) queue 
queue/HornetQMDBQueue (defined in war's hornetq-jms.xml).

The onMessage method call following simple logic

public void handleMessage(String id, TextMessage tm) {
        try {
            String logMsg = id + " Testing MDB : " + tm.getText()
                    + ", " + this.toString();
            System.out.println(logMsg);
            if (tm.getText().startsWith("Hello")) {
                // simulate heavy processing of first message
                long sec = 5;
                System.out.println(logMsg + " - Heavy processing for " + sec
                        + " seconds. " + tm.getText() + ", " + this.toString());
                Random r = new Random();
                for (long stop = System.nanoTime()
                        + TimeUnit.SECONDS.toNanos(5); stop > 
System.nanoTime();) {
                    r.nextDouble();
                }
                System.out.println(logMsg + " Done!");
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

test_amq_web.war
This war contains clients for the test queues. /test tests AMQ. /test2 tests 
HornetQ. Following is the logic used:

    for (int i = 0; i < 5; i++) {
        msg.setText("Hello " + i + "[" + groupName + "]");
        msg2.setText("World " + i + "[" + groupName + "]");

        if (useGroup) {
            msg.setStringProperty("JMSXGroupID", groupName);
            msg2.setStringProperty("JMSXGroupID", groupName);
        }

        qsender.send(msg);
        qsender.send(msg2);
    }

Each servlet creates 5 threads. Each thread creates two text messages and sends 
it out 5 times.

Executing test cases
- ActiveMQ test: http://localhost:8080/test_amq_web/test
- ActiveMQ test without JMSXGroupID: 
http://localhost:8080/test_amq_web/test?group=false
- HornetQ test: http://localhost:8080/test_amq_web/test2
- HornetQ test without JMSXGroupID: 
http://localhost:8080/test_amq_web/test2?group=false

Results:

When invoking the test_amq_web/test we can see that messages using the same 
message group are not being processed sequentially. In the logs we get messages 
in this order:

15:00:47,031 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : 
Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730
15:00:47,032 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : 
Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 - Heavy processing 
for 5 seconds. Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730
...
15:00:47,066 INFO  [stdout] (default-threads - 12) Consume-AMQ3 Testing MDB : 
World 0[Group-1], com.redhat.gss.test.HandleMessage@5418f143
...
15:00:52,035 INFO  [stdout] (default-threads - 7) Consume-AMQ3 Testing MDB : 
Hello 0[Group-1], com.redhat.gss.test.HandleMessage@53e59730 Done!

However when invoking test_amq_web/test2 (which uses HornetQ) we can see that 
the message are being processed sequentially:

15:08:35,740 INFO  [stdout] (Thread-1 
(HornetQ-client-global-threads-1277578712)) Consume-HQ1 Testing MDB : Hello 
0[Group-0], com.redhat.gss.test.HandleMessage@67092e88
15:08:35,744 INFO  [stdout] (Thread-1 
(HornetQ-client-global-threads-1277578712)) Consume-HQ1 Testing MDB : Hello 
0[Group-0], com.redhat.gss.test.HandleMessage@67092e88 - Heavy processing for 5 
seconds. Hello 0[Group-0], com.redhat.gss.test.HandleMessage@67092e88
...
15:08:40,784 INFO  [stdout] (Thread-0 
(HornetQ-client-global-threads-1277578712)) Consume-HQ3 Testing MDB : Hello 
0[Group-3], com.redhat.gss.test.HandleMessage@bce1791 Done!
15:08:40,788 INFO  [stdout] (Thread-0 
(HornetQ-client-global-threads-1277578712)) Consume-HQ3 Testing MDB : World 
0[Group-3], com.redhat.gss.test.HandleMessage@5e8ace0e
                  
> The Resource Adapter ignores the JMSXGroupID when dispatching to MDBs
> ---------------------------------------------------------------------
>
>                 Key: AMQ-1126
>                 URL: https://issues.apache.org/jira/browse/AMQ-1126
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JCA Container
>    Affects Versions: 4.0.1
>         Environment: Java 1.4.2_08
> JBoss 4.0.4
> ActiveMQ 4.0.1
>            Reporter: John Robinson
>             Fix For: NEEDS_REVIEWED
>
>         Attachments: amq_reproducer.zip, msg-group-test.zip
>
>
> Integrate AMQ into JBoss using the data source, and resource adapter.  Create 
> an outbound queue and an MDB with a pool size of 100.  Dispatch several 
> messages to the outbound queue, setting the JMSXGroupID property on the 
> message to be the same value each time.  In the MDB's onMessage method print 
> out the MDBs toString (don't override toString) and you should see something 
> that looks like:
> OutQueueProcessorBean@19a7266
> Observe two things:
> a) Many messages are processed in parallel
> b) Many different values will occur after the @ in the above message, 
> denoting that more than on MDB instance is being handed messages.
> The correct behavior would be to dispatch messages with the same group id to 
> the same MDB instance in sequence.  This would allow messages from different 
> groups to be processed in parallel, but messages in any one group would be 
> processed serially, in the order in which they were placed into the queue.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to