User: norbert
Date: 00/05/17 16:43:09
Modified: src/java/org/spyderMQ JMSServer.java SessionQueue.java
SpyMessageConsumer.java SpyQueue.java SpyTopic.java
SpyTopicSession.java SpyTopicSubscriber.java
Log:
Very basic selector system ( there's no parser yet )
Revision Changes Path
1.27 +3 -3 spyderMQ/src/java/org/spyderMQ/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServer.java,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -r1.26 -r1.27
--- JMSServer.java 2000/05/16 18:04:58 1.26
+++ JMSServer.java 2000/05/17 23:43:08 1.27
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.26 $
+ * @version $Revision: 1.27 $
*/
public class JMSServer
implements Runnable
@@ -267,7 +267,7 @@
return q;
}
- //A clonnection is closing
+ //A connection is closing [error or notification]
public synchronized void connectionClosing(SpyDistributedConnection dc)
{
if (dc==null) return;
@@ -294,7 +294,7 @@
SpyDistributedConnection
dc2=(SpyDistributedConnection)i2.next();
if (dc.equals(dc2)) {
Log.log("FIXME: The
DistributedConnection is registered !");
- //Remove it...
+ //Remove it !!!!!
}
}
}
1.13 +44 -35 spyderMQ/src/java/org/spyderMQ/SessionQueue.java
Index: SessionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- SessionQueue.java 2000/05/15 17:53:58 1.12
+++ SessionQueue.java 2000/05/17 23:43:08 1.13
@@ -11,20 +11,21 @@
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.LinkedList;
+import org.spydermq.selectors.Selector;
/**
* This class is a message queue which is stored (hashed by Destination) in the
SpySession object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class SessionQueue
{
// Attributes ----------------------------------------------------
//the MessageConsumer of this queue
- MessageConsumer destination;
+ SpyMessageConsumer destination;
//List of Pending messages (not yet delivered)
LinkedList messages;
//List of messages waiting for acknoledgment
@@ -49,7 +50,7 @@
// Package protected ---------------------------------------------
- void setConsumer(MessageConsumer consumer)
+ void setConsumer(SpyMessageConsumer consumer)
{
destination=consumer;
}
@@ -93,7 +94,7 @@
if (!waitInReceive) return false;
messages.notify();
} else {
- SpyMessage mes=getMessage();
+ SpyMessage
mes=getMessage(destination.selector);
if (mes==null) return false;
listener.onMessage(mes);
}
@@ -107,56 +108,64 @@
}
}
- SpyMessage getMessage()
+ SpyMessage getMessage(Selector selector)
{
synchronized (messages) {
while (true) {
try {
- if (messages.size()==0)
- return null;
+ if (messages.size()==0) return null;
SpyMessage
mes=(SpyMessage)messages.removeFirst();
- if (!mes.isOutdated()) {
+ if (mes.isOutdated()) {
+ Log.log("SessionQueue: I dropped a
message (timeout)");
+ continue;
+ }
+
+ if (selector!=null) {
+ if (!selector.test(mes)) {
+ Log.log("SessionQueue: I
dropped a message (selector)");
+ continue;
+ } else {
+ Log.log("SessionQueue:
selector evaluates TRUE");
+ }
+ }
- //the SAME Message object is put in
different SessionQueues
- //when we deliver it, we have to
clone() it to insure independance
- SpyMessage message=mes.myClone();
+ //the SAME Message object is put in different
SessionQueues
+ //when we deliver it, we have to clone() it to
insure independance
+ SpyMessage message=mes.myClone();
- if (!transacted) {
- if
(acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
+ if (!transacted) {
+ if
(acknowledgeMode==Session.CLIENT_ACKNOWLEDGE) {
- synchronized
(messagesWaitForAck) {
- //Put the
message in the messagesWaitForAck queue
-
messagesWaitForAck.addLast(message);
- }
-
-
message.setSessionQueue(this);
-
- } else if
(acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
- //DUPS_OK_ACKNOWLEDGE
- } else {
- //AUTO_ACKNOWLEDGE
- //we don't need to
keep this message in a queue
- }
- } else {
-
- //We are linked to a
transacted session
-
synchronized
(messagesWaitForAck) {
//Put the message in
the messagesWaitForAck queue
messagesWaitForAck.addLast(message);
}
+
+ message.setSessionQueue(this);
+
+ } else if
(acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
+ //DUPS_OK_ACKNOWLEDGE
+ } else {
+ //AUTO_ACKNOWLEDGE
+ //we don't need to keep this
message in a queue
+ }
+ } else {
+ //We are linked to a transacted
session
+
+ synchronized (messagesWaitForAck) {
+ //Put the message in the
messagesWaitForAck queue
+
messagesWaitForAck.addLast(message);
}
-
- return message;
+
}
-
- Log.log("Outdated message");
-
+
+ return message;
+
} catch (Exception e) {
Log.error(e);
}
1.9 +8 -4 spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- SpyMessageConsumer.java 2000/05/15 17:53:58 1.8
+++ SpyMessageConsumer.java 2000/05/17 23:43:08 1.9
@@ -12,13 +12,14 @@
import javax.jms.Message;
import java.util.LinkedList;
import java.util.Date;
+import org.spydermq.selectors.Selector;
/**
* This class implements javax.jms.MessageConsumer
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class SpyMessageConsumer
implements MessageConsumer
@@ -33,6 +34,8 @@
protected SessionQueue mySessionQueue;
//Am I closed ?
protected boolean closed;
+ //Do I have a selector
+ public Selector selector;
// Constructor ---------------------------------------------------
@@ -42,6 +45,7 @@
mySessionQueue=sq;
messageListener=null;
closed=false;
+ selector=null;
}
// Public --------------------------------------------------------
@@ -88,7 +92,7 @@
if (closed) return null;
if (!session.modeStop) {
- Message mes=mySessionQueue.getMessage();
+ Message
mes=mySessionQueue.getMessage(selector);
if (mes!=null) return mes;
} else Log.log("the connection is stopped !");
@@ -123,7 +127,7 @@
if (closed) return null;
if (!session.modeStop) {
- Message mes=mySessionQueue.getMessage();
+ Message
mes=mySessionQueue.getMessage(selector);
if (mes!=null) return mes;
} else Log.log("the connection is stopped !");
@@ -151,7 +155,7 @@
while (true) {
if (session.modeStop) return null;
- return mySessionQueue.getMessage();
+ return mySessionQueue.getMessage(selector);
}
}
1.5 +2 -2 spyderMQ/src/java/org/spyderMQ/SpyQueue.java
Index: SpyQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueue.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SpyQueue.java 2000/05/15 02:41:47 1.4
+++ SpyQueue.java 2000/05/17 23:43:08 1.5
@@ -15,7 +15,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SpyQueue
extends SpyDestination
@@ -39,7 +39,7 @@
public String toString()
{
- return this.getClass().getName()+"@"+name;
+ return "Queue@"+name;
}
// Object override -----------------------------------------------
1.9 +2 -2 spyderMQ/src/java/org/spyderMQ/SpyTopic.java
Index: SpyTopic.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopic.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- SpyTopic.java 2000/05/11 02:00:53 1.8
+++ SpyTopic.java 2000/05/17 23:43:08 1.9
@@ -15,7 +15,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class SpyTopic
extends SpyDestination
@@ -39,7 +39,7 @@
public String toString()
{
- return this.getClass().getName()+"@"+name;
+ return "Topic@"+name;
}
// Object override -----------------------------------------------
1.20 +2 -2 spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSession.java,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -r1.19 -r1.20
--- SpyTopicSession.java 2000/05/17 00:28:05 1.19
+++ SpyTopicSession.java 2000/05/17 23:43:08 1.20
@@ -22,7 +22,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.19 $
+ * @version $Revision: 1.20 $
*/
public class SpyTopicSession
extends SpySession
@@ -50,7 +50,7 @@
if (closed) throw new IllegalStateException("The session is closed");
SessionQueue sessionQueue=new SessionQueue(transacted,acknowledgeMode);
- TopicSubscriber sub=new SpyTopicSubscriber(this,sessionQueue,topic);
+ SpyTopicSubscriber sub=new SpyTopicSubscriber(this,sessionQueue,topic);
sessionQueue.setConsumer(sub);
addConsumer(topic,sessionQueue);
return sub;
1.7 +10 -1 spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java
Index: SpyTopicSubscriber.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyTopicSubscriber.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- SpyTopicSubscriber.java 2000/05/04 23:39:36 1.6
+++ SpyTopicSubscriber.java 2000/05/17 23:43:08 1.7
@@ -10,12 +10,14 @@
import javax.jms.JMSException;
import javax.jms.Topic;
+import org.spydermq.selectors.Selector;
+
/**
* This class implements javax.jms.TopicSubscriber
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class SpyTopicSubscriber
extends SpyMessageConsumer
@@ -55,6 +57,13 @@
{
session.removeConsumer(topic,mySessionQueue);
super.close();
+ }
+
+ // ----- Debug only ----- [not part of the spec]
+
+ public void setSelector(Selector selector)
+ {
+ this.selector=selector;
}
}