User: pkendall
Date: 01/05/13 01:25:20
Modified: src/main/org/jbossmq/server ClientConsumer.java
BasicQueue.java
Log:
Fix multiple subscriber problem.
Pass selector to receiveMessage for receiveNoWait calls.
Revision Changes Path
1.4 +4 -4 jbossmq/src/main/org/jbossmq/server/ClientConsumer.java
Index: ClientConsumer.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/ClientConsumer.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ClientConsumer.java 2001/03/02 01:13:01 1.3
+++ ClientConsumer.java 2001/05/13 08:25:20 1.4
@@ -31,7 +31,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class ClientConsumer implements Task {
@@ -185,7 +185,7 @@
llClone.add( req );
HashMap destinationSubscriptionsClone =
(HashMap)destinationSubscriptions.clone();
- destinationSubscriptions.put(req.destination, llClone);
+ destinationSubscriptionsClone.put(req.destination,
llClone);
destinationSubscriptions =
destinationSubscriptionsClone;
}
}
@@ -274,10 +274,10 @@
if( queue.isTopic ) {
if( req.durableSubscriptionName!=null ) {
String queueId =
JMSDestination.durableSubscriptionToQueueId(dc.getClientID(),req.durableSubscriptionName);
- return
queue.getExclusiveQueue(queueId).receiveMessage();
+ return
queue.getExclusiveQueue(queueId).receiveMessage(req.messageSelector);
}
} else {
- return
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage();
+ return
queue.getExclusiveQueue(queue.DEFAULT_QUEUE_ID).receiveMessage(req.messageSelector);
}
}
1.3 +28 -10 jbossmq/src/main/org/jbossmq/server/BasicQueue.java
Index: BasicQueue.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jbossmq/server/BasicQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- BasicQueue.java 2001/03/02 01:13:01 1.2
+++ BasicQueue.java 2001/05/13 08:25:20 1.3
@@ -32,7 +32,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
abstract public class BasicQueue implements Task, AbstractQueue {
@@ -145,17 +145,35 @@
}
- //Used by QueueReceivers for receive(), receive(long wait), and receiveNoWait()
- public SpyMessage receiveMessage() throws JMSException
+ //Used by QueueReceivers for receiveNoWait()
+ public SpyMessage receiveMessage(String selector) throws JMSException
{
- synchronized (messages) {
- if (messages.size()==0)
+ if( selector == null ) {
+ synchronized (messages) {
+ if (messages.size()==0)
+ return null;
+
+ SpyMessage m = (SpyMessage)messages.first();
+ messages.remove(m);
+
+ return m;
+ }
+ } else {
+ synchronized (messages) {
+ if (messages.size()==0)
+ return null;
+
+ Selector s = new Selector( selector );
+ Iterator i = messages.iterator();
+ while( i.hasNext() ) {
+ SpyMessage m = (SpyMessage)i.next();
+ if( s.test(m) ) {
+ messages.remove(m);
+ return m;
+ }
+ }
return null;
-
- SpyMessage m = (SpyMessage)messages.first();
- messages.remove(m);
-
- return m;
+ }
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development