I understand the Spec and do not try to bypass it. But the problem I have is 
not solved by any standard mechanism.

I have messages on a queue that should be processed by MDB's. But messages with 
the same key (message property) must not be parallelized, but processed in the 
order they came onto the queue. Messages with different key's are indepent from 
each other. We call that KeyStreaming.

Importantly, I think I found the problem during my tests, and does seem either 
a bug in the Jboss MQ or a configuration error that I didn't find until now. I 
also found a workaround, but I am not sure if it is 100%. I will point that out 
at the end of the post and hope you bare with me until then.

OK, First to your questions, then I will show you the interessting parts of my 
code. The concept worked before with Oracle AQ in a non-J2EE Environment.

1) That is interessting. Although you are right that there are multiple MDB's, 
the application logic ensures that messages with the same key are processed and 
sent to the queue in the right order! In my case there is a 2 second gab 
between the first set and the second. Nevertheless the client (Non-MDB) gets 
the second set first.

Does that mean JbossMQ does not garantee that kind of global ordering? How can 
I ensure global ordering? using the priority as a counter seems a crude 
workaround.

2) In case of a key the MDB calls another SessionBean that has bean-managed 
transactions. That means that the onMessage transaction is suspended. The 
SessionBean does: 

XASession --> onMessage --> EJB-Call --> XASuspend-->
lock --> beginUserTrans --> send --> endUserTrans --> unlock
End-EJB-Call --> XAResume --> XACommit

The xacommit of the MDB does not concern me in my session bean right?

3) The MDB listens with a messageSelector that is mutal-exclusive to the one 
used my the SessionBean and its QueueBrowser. The messages received and seen by 
the QueueBrowser are not subject to any race conditions as there is only ever 1 
instance that will consume them. Hence If nobody received them, the 
QueueBrowser has to see them.

Standard Configuration
Default Server
Java5 RedHat 3 AS
No Changes to MDB configuration.

Ok First of all. The piece of code that sends to the input side of the MDB uses 
the following:

public class KeyStreamer
  | {
  |     private HashSet alreadySent = new HashSet();
  |     public static final String DATA_ONLY_PROPERTY = "queue_keystream_data";
  |     private final static Logger logger = 
Logger.getLogger(KeyStreamer.class);
  |     public void send(Session session,MessageProducer 
producer,javax.jms.Message msg) throws JMSException
  |     {
  |         if (producer.getDestination() instanceof Queue)
  |         {
  |             final String key = 
msg.getStringProperty(MessageProperties.MSG_KEY_PROPERTY);
  |             msg.setBooleanProperty(DATA_ONLY_PROPERTY,true);
  |             if (!alreadySent.contains(key))
  |             {
  |                 final Message notification = session.createMessage();
  |                 
notification.setStringProperty(MessageProperties.MSG_KEY_PROPERTY,key);
  |                 producer.send(notification);
  |                 logger.info("Notification for: " + key);
  |                 alreadySent.add(key);
  |             }
  |             producer.send(msg);
  |         }
  |         else
  |             throw new NotImplementedException("Keystreaming on topics is 
not implemented");
  |     }
  | 
  |     public void reset()
  |     {
  |         alreadySent.clear();
  |     }
  | }
  | 

reset is called after every commit. In short it means that for every new key 
there will exist a 'notification' message in one transaction.

The MDB has a message selector 'queue_keystream_data is null', It only listens 
for notifications.

The MDB's onMessage method includes the following:

if (message.propertyExists(MessageProperties.MSG_KEY_PROPERTY))
  |             {
  |                 final Destination lJMSDestination = 
message.getJMSDestination();
  |                 final String key = 
message.getStringProperty(MessageProperties.MSG_KEY_PROPERTY);
  |                         keyStreamer.create().keyStream(key,(Queue) 
lJMSDestination , proxy);
  | 
  |             }
  |             else
  |                 proxy.process(message);

proxy is a small handler that sends the message to the output queue.
The KeyStreamer:

    /**
  |      * @ejb.interface-method view-type="local"
  |      * @ejb.transaction type="NotSupported"
  |      */
  |     public void keyStream(String key, Queue jmsDestination, ProcessProxy 
proxy) throws RBException
  |     {
  | 
  |         if (!tryLock(key, jmsDestination))
  |         {
  |             logger.info("Dump Key: " + key);
  |             return;
  |         }
  |         boolean locked = true;
  |         try
  |         {
  |             do
  |             {
  |                 MessageConsumer consumer = setupKeyStreamedConnection(key, 
jmsDestination);
  |                 try
  |                 {
  |                     process(consumer, proxy);
  |                 } finally
  |                 {
  |                     ResourceUtils.jms.SafeClose(consumer);
  |                 }
  |                 unLock(key, jmsDestination);
  |                 locked = false;
  |                 try
  |                 {
  |                     logger.info("Check key: " + key);
  |                     final QueueBrowser lBrowser = 
session.createBrowser(jmsDestination, getMessageSelector(key));
  |                     try
  |                     {
  |                         if (!lBrowser.getEnumeration().hasMoreElements())
  |                             break;
  |                     } finally
  |                     {
  |                         ResourceUtils.jms.SafeClose(lBrowser);
  |                     }
  |                 } catch (JMSException e)
  |                 {
  |                     throw new EJBException(e);
  |                 }
  |                 logger.info("Found keys again: " + key);
  |                 if (!tryLock(key, jmsDestination))
  |                     return;
  |                 locked = true;
  |             } while (true);
  |         } finally
  |         {
  |             logger.info("End Keystreaming: " + key);
  |             if (locked)
  |                 unLock(key, jmsDestination);
  |             deinit();
  |         }
  | 
  |     }
  |     private MessageConsumer setupKeyStreamedConnection(String key, 
Destination jmsDestination)
  |     {
  |         try
  |         {
  |             setupSession();
  |             return session.createConsumer(jmsDestination, 
getMessageSelector(key));
  |         } catch (JMSException e)
  |         {
  |             throw new EJBException("Setup of keystreamed jms connection", 
e);
  |         }
  |     }    
  | 
  |      final protected String getMessageSelector(String key)
  |     {
  | //keyStreamSelector is "queue_keystream_data = TRUE and 
MessageProperties.MSG_KEY_PROPERTY = '?'
  |         return StringUtils.replace(keyStreamSelector, "?", key);
  |     }
  |     protected void process(MessageConsumer consumer, ProcessProxy proxy) 
throws RBException
  |     {
  |         try
  |         {
  |             connection.start();
  |             final UserTransaction ut = 
getSessionContext().getUserTransaction();
  |             try
  |             {
  |                 ResourceUtils.transaction.Begin(ut,EJBException.class);
  |                 Message message = consumer.receiveNoWait();
  |                 while (message != null)
  |                 {
  |                     proxy.process(message);
  |                     ResourceUtils.transaction.Commit(ut,EJBException.class);
  |                     ResourceUtils.transaction.Begin(ut,EJBException.class);
  |                     message = consumer.receiveNoWait();
  |                 }
  | 
  |                 ResourceUtils.transaction.Commit(ut,EJBException.class);
  |             } catch (RBException e)
  |             {
  |                 ResourceUtils.transaction.SafeRollback(ut);
  |                 throw e;
  |             }
  |         } catch (JMSException e)
  |         {
  |             throw new EJBException(e);
  |         }
  | 
  |     }
  | 

1) The MDB gets a notification.
2) It tries to lock the key. If it does not get the lock, another MDB has 
already received a notification on the same key and is working on it. It is the 
other MDB's responsibility.
3) if it got the lock, setup a consumer with a selector to only get the 'data' 
messages with that key --> getMessageSelector.
4) consume and process all 'data' messages. wrap each one i a user transaction
5) unlock
6) Check if the queue received any other messages with the same selector in the 
mean time--> Queue Browser. If yes, start with TryLock again. Otherwise go home.

As you can see now, the MDB and the SB consume different sets of messages. 
There is one notification for each key in each transaction on the input side. 
This way it should be impossible to miss any data messages because of race 
conditions. The notification 'triggers' the consumtion of the related data 
messages. The lock ensures that only one is working on them at a time, thus 
ensuring the right processing order. I verified that already, no problems there.

There is a race condition between the last receiveNoWait and the unlock. In 
this small amount of time, the input queue might have received another sent of 
messages (notification and data) for that key. Also in the same time another 
MDB might have consumed the notifcation, did not get the lock and dumped the 
notification. The data messages are on the queue without a trigger! 
Therefore we check if there are any new data messages after the unlock. That is 
done via the QueueBrowser. And we are really only interested in messages that 
are not consumed by anyone. If we find one, that means that the race condition 
before did apply and we tryLock again. 
Why use a Browser isstead of the receiver? I must not change the Queue outside 
the lock. Because Although I might see a new message through the browser, 
another MDB might start consuming the key.

This all would be easier if I could draw you a diagram. But I hope you 
understand this now better.


IMPORTANT:

I found something very important. The client code in my test added a 
messageproperty 'Transaction' which it assigned a number for each JMS 
Transaction. Thus I could track the transaction boundries and their behaviour 
in the backend.

I found that although I have a notification and a data message in one JMS 
Transaction, The notfication gets consumed and the data is not available on the 
queue? 

For me it seems like the notification message is already read before all other 
messages in that transactions are available.

To check that I changed the 'class KeyStreamer' to send the notifications not 
before the data messages but in the reset method which is called just before 
the commit. Hence the notifications (that trigger the MDB) are always the last 
messages on the queue in a JMS Transaction.
And All of a sudden my testcase worked!!

Has this todo with transaction isolation? Where can I check that for JbossMQ, I 
couldn't find it.

View the original post : 
http://www.jboss.org/index.html?module=bb&op=viewtopic&p=3866209#3866209

Reply to the post : 
http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=3866209


-------------------------------------------------------
SF email is sponsored by - The IT Product Guide
Read honest & candid reviews on hundreds of IT Products from real users.
Discover which products truly live up to the hype. Start reading now.
http://ads.osdn.com/?ad_id=6595&alloc_id=14396&op=click
_______________________________________________
JBoss-user mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/jboss-user

Reply via email to