User: norbert
Date: 00/05/24 18:18:35
Modified: src/java/org/spyderMQ JMSServer.java SessionQueue.java
SpyConnection.java SpyMessageConsumer.java
SpyQueueConnection.java SpyQueueReceiver.java
SpyQueueSender.java SpyQueueSession.java
SpySession.java
Added: src/java/org/spyderMQ ConnectionQueue.java
Log:
More for P2P :
Create a new class ( ConnectionQueue ) which holds the subscribers HashSet
Revision Changes Path
1.32 +9 -9 spyderMQ/src/java/org/spyderMQ/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/JMSServer.java,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -r1.31 -r1.32
--- JMSServer.java 2000/05/22 17:32:04 1.31
+++ JMSServer.java 2000/05/25 01:18:33 1.32
@@ -22,7 +22,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.31 $
+ * @version $Revision: 1.32 $
*/
public class JMSServer
implements Runnable
@@ -224,30 +224,30 @@
public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection
dc)
{
- SpyTemporaryTopic t=new SpyTemporaryTopic("JMS_TT"+(new
Integer(lastTemporaryTopic++).toString()),dc);
+ SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new
Integer(lastTemporaryTopic++).toString()),dc);
synchronized (messageQueue) {
- JMSServerQueue queue=new JMSServerQueue(t,dc,this);
+ JMSServerQueue queue=new JMSServerQueue(topic,dc,this);
HashMap newMap=(HashMap)messageQueue.clone();
- newMap.put(t,queue);
+ newMap.put(topic,queue);
messageQueue=newMap;
}
- return t;
+ return topic;
}
public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection
dc)
{
- SpyTemporaryQueue q=new SpyTemporaryQueue("JMS_TQ"+(new
Integer(lastTemporaryQueue++).toString()),dc);
+ SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new
Integer(lastTemporaryQueue++).toString()),dc);
synchronized (messageQueue) {
- JMSServerQueue queue=new JMSServerQueue(q,dc,this);
+ JMSServerQueue sessionQueue=new
JMSServerQueue(newQueue,dc,this);
HashMap newMap=(HashMap)messageQueue.clone();
- newMap.put(q,queue);
+ newMap.put(newQueue,sessionQueue);
messageQueue=newMap;
}
- return q;
+ return newQueue;
}
//A connection is closing [error or notification]
1.16 +20 -13 spyderMQ/src/java/org/spyderMQ/SessionQueue.java
Index: SessionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SessionQueue.java,v
retrieving revision 1.15
retrieving revision 1.16
diff -u -r1.15 -r1.16
--- SessionQueue.java 2000/05/24 19:17:18 1.15
+++ SessionQueue.java 2000/05/25 01:18:33 1.16
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.15 $
+ * @version $Revision: 1.16 $
*/
public class SessionQueue
{
@@ -28,21 +28,21 @@
//List of messages waiting for acknoledgment
LinkedList messagesWaitingForAck;
- //Is the session transacted ?
- boolean transacted;
- //What is the message acknowledgment mode ?
- int acknowledgeMode;
//the MessageConsumers linked to this queue
HashSet subscribers;
+ //Number of listening receivers
+ int NumListeningSubscribers;
+ //My SpySession
+ SpySession session;
// Constructor ---------------------------------------------------
- SessionQueue(boolean transacted,int acknowledgeMode)
+ SessionQueue(SpySession session)
{
messagesWaitingForAck=new LinkedList();
subscribers=new HashSet();
- this.transacted=transacted;
- this.acknowledgeMode=acknowledgeMode;
+ this.session=session;
+ NumListeningSubscribers=0;
}
// Package protected ---------------------------------------------
@@ -144,11 +144,6 @@
}
}
- void notifyReceiverStopped(boolean newMode)
- {
- //For Queues. Not yet implemented -
- }
-
synchronized void addConsumer(SpyMessageConsumer consumer)
{
consumer.setSessionQueue(this);
@@ -163,6 +158,18 @@
newSet.remove(consumer);
subscribers=newSet;
return subscribers.size()==0;
+ }
+
+ synchronized void changeNumListening(int val)
+ {
+ NumListeningSubscribers+=val;
+
+ if (val==-1&&NumListeningSubscribers==0) {
+
((SpyQueueConnection)session.connection).changeNumListening(val);
+ } else if (val==1&&NumListeningSubscribers==1) {
+
((SpyQueueConnection)session.connection).changeNumListening(val);
+ }
+
}
}
1.30 +50 -49 spyderMQ/src/java/org/spyderMQ/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyConnection.java,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -r1.29 -r1.30
--- SpyConnection.java 2000/05/19 19:28:49 1.29
+++ SpyConnection.java 2000/05/25 01:18:33 1.30
@@ -29,7 +29,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.29 $
+ * @version $Revision: 1.30 $
*/
public class SpyConnection
implements Connection, Serializable
@@ -43,13 +43,13 @@
protected String clientID;
//the distributed object which receives messages from the JMS server
protected SpyDistributedConnection distributedConnection;
- //HashMap of (HashSet of Sessions) by Destination
- public HashMap subscribers;
+ //HashMap of ConnectionQueue by Destination
+ public HashMap destinations;
//LinkedList of all created sessions by this connection
HashSet createdSessions;
//Last message ID returned
private int lastMessageID;
- //Am I in stopped mode ?
+ //Is the connection stopped ?
protected boolean modeStop;
//Is the connection closed ?
boolean closed;
@@ -64,7 +64,7 @@
{
//Set the attributes
provider = theServer;
- subscribers=new HashMap();
+ destinations=new HashMap();
createdSessions=new HashSet();
distributedConnection=null;
closed=false;
@@ -183,11 +183,11 @@
try {
- //Remove it from the subscribers list
- synchronized (subscribers) {
- HashMap newMap=(HashMap)subscribers.clone();
+ //Remove it from the destinations list
+ synchronized (destinations) {
+ HashMap newMap=(HashMap)destinations.clone();
newMap.remove(dest);
- subscribers=newMap;
+ destinations=newMap;
}
//Notify its sessions that this TemporaryDestination is going
to be deleted()
@@ -240,33 +240,30 @@
if (distributedConnection==null) createReceiver();
Log.log("Connection: addSession(dest="+dest.toString()+")");
+
- HashSet sub=(HashSet)subscribers.get(dest);
- if (sub==null) {
-
- synchronized (subscribers) {
- sub=new HashSet();
- sub.add(who);
- HashMap newSubscribers=(HashMap)subscribers.clone();
- newSubscribers.put(dest,sub);
- subscribers=newSubscribers;
- }
-
- try {
- provider.subscribe(dest,distributedConnection);
- } catch (Exception e) {
- failureHandler(e,"Cannot subscribe to this
Destination");
- }
-
- } else {
-
- synchronized (subscribers) {
- HashSet newSub=(HashSet)sub.clone();
- newSub.add(who);
- subscribers.put(dest,newSub);
+ try {
+
+ synchronized (destinations) {
+
+ ConnectionQueue
connectionQueue=(ConnectionQueue)destinations.get(dest);
+
+ if (connectionQueue==null) {
+ connectionQueue=new ConnectionQueue(dest,this);
+ connectionQueue.addSession(who);
+ HashMap
newDestinations=(HashMap)destinations.clone();
+ newDestinations.put(dest,connectionQueue);
+ destinations=newDestinations;
+
provider.subscribe(dest,distributedConnection);
+ } else {
+ connectionQueue.addSession(who);
+ }
}
-
- }
+
+ } catch (Exception e) {
+ failureHandler(e,"Cannot subscribe to this Destination");
+ }
+
}
//The session does not need to recieve the messages to Destination dest
@@ -277,28 +274,32 @@
Log.log("Connection: removeSession(dest="+dest.toString()+")");
try {
- synchronized (subscribers) {
- HashSet sub=(HashSet)subscribers.get(dest);
- if (sub!=null) {
- HashSet newSub=(HashSet)sub.clone();
- newSub.remove(who);
- if (newSub.isEmpty()) {
- HashMap
newSubscribers=(HashMap)subscribers.clone();
- newSubscribers.remove(dest);
- subscribers=newSubscribers;
+
+ synchronized (destinations) {
+
+ ConnectionQueue
connectionQueue=(ConnectionQueue)destinations.get(dest);
+
+ if (connectionQueue!=null) {
+ boolean
empty=connectionQueue.removeSession(who);
+ if (empty) {
+ HashMap
newDestinations=(HashMap)destinations.clone();
+ newDestinations.remove(dest);
+ destinations=newDestinations;
provider.unsubscribe(dest,distributedConnection);
- } else subscribers.put(dest,newSub);
+ }
} else {
//this should not happen
- HashMap
newSubscribers=(HashMap)subscribers.clone();
- newSubscribers.remove(dest);
- subscribers=newSubscribers;
+ HashMap
newDestinations=(HashMap)destinations.clone();
+ newDestinations.remove(dest);
+ destinations=newDestinations;
provider.unsubscribe(dest,distributedConnection);
}
+
}
+
} catch (Exception e) {
failureHandler(e,"Cannot unsubscribe to this destination");
- }
+ }
}
@@ -333,7 +334,7 @@
createdSessions.remove(who);
}
- //This session should not be in the subscribers object anymore.
+ //This session should not be in the "destinations" object anymore.
//We could check this, though
}
1.13 +2 -2 spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java
Index: SpyMessageConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyMessageConsumer.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- SpyMessageConsumer.java 2000/05/24 19:17:18 1.12
+++ SpyMessageConsumer.java 2000/05/25 01:18:33 1.13
@@ -20,7 +20,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class SpyMessageConsumer
implements MessageConsumer
@@ -160,7 +160,7 @@
message.setSessionQueue(sessionQueue);
- } else if
(sessionQueue.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
+ } else if
(session.acknowledgeMode==Session.DUPS_OK_ACKNOWLEDGE) {
//DUPS_OK_ACKNOWLEDGE
} else {
//AUTO_ACKNOWLEDGE
1.6 +7 -3 spyderMQ/src/java/org/spyderMQ/SpyQueueConnection.java
Index: SpyQueueConnection.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueConnection.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SpyQueueConnection.java 2000/05/16 03:31:00 1.5
+++ SpyQueueConnection.java 2000/05/25 01:18:33 1.6
@@ -21,7 +21,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class SpyQueueConnection
extends SpyConnection
@@ -87,8 +87,7 @@
failureHandler(e,"Cannot create a temporary queue !");
return null;
}
- }
-
+ }
//Get a queue
Queue createQueue(String name) throws JMSException
@@ -103,4 +102,9 @@
}
}
+ synchronized void changeNumListening(int val)
+ {
+ Log.log("Connection: changeNumListening("+((val>0)?"+)":"-)"));
+ }
+
}
1.4 +96 -25 spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueReceiver.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyQueueReceiver.java 2000/05/24 19:17:18 1.3
+++ SpyQueueReceiver.java 2000/05/25 01:18:33 1.4
@@ -11,13 +11,14 @@
import javax.jms.Queue;
import javax.jms.Message;
import javax.jms.MessageListener;
+import java.util.Date;
/**
* This class implements javax.jms.QueueReceiver
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyQueueReceiver
extends SpyMessageConsumer
@@ -28,15 +29,15 @@
//The queue I registered
private Queue queue;
//Mode of this QueueReceiver
- boolean modeStop;
+ boolean listening;
// Constructor ---------------------------------------------------
- SpyQueueReceiver(SpyQueueSession s,Queue q,boolean stop)
+ SpyQueueReceiver(SpyQueueSession session,Queue queue)
{
- super(s);
- queue=q;
- modeStop=stop;
+ super(session);
+ this.queue=queue;
+ listening=false;
}
// Public --------------------------------------------------------
@@ -48,19 +49,12 @@
return queue;
}
- void setModeStop(boolean mode)
- {
- if (modeStop==mode) return;
- modeStop=mode;
- ((SpyQueueSession)session).notifyReceiverStopped(this,mode);
- }
-
public void close() throws JMSException
{
if (closed) return;
closed=true;
- setModeStop(true);
+ setListening(false);
}
//Overrides MessageConsumer
@@ -68,17 +62,91 @@
public Message receive() throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+
+ setListening(true);
+
+ synchronized (messages) {
+
+ //if the client follows the specification [4.4.6], he cannot
use this session
+ //to asynchronously receive a message or receive() in another
thread.
+ //If a message is already pending for this session, we can
immediatly deliver it
- //Not implemented yet
- return null;
+ while (true) {
+
+ if (closed) {
+ setListening(false);
+ return null;
+ }
+
+ if (!session.modeStop) {
+ Message mes=getMessage();
+ if (mes!=null) {
+ setListening(false);
+ return mes;
+ }
+ } else Log.log("the connection is stopped !");
+
+ try {
+ waitInReceive=true;
+ messages.wait();
+ } catch (InterruptedException e) {
+ } finally {
+ waitInReceive=false;
+ }
+
+ }
+
+ }
+
}
public Message receive(long timeOut) throws JMSException
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
+
+ if (timeOut==0) return receive();
+ long endTime=(new Date()).getTime()+timeOut;
- //Not implemented yet
- return null;
+ setListening(true);
+
+ synchronized (messages) {
+
+ //if the client respects the specification [4.4.6], he cannot
use this session
+ //to asynchronously receive a message or receive() from
another thread.
+ //If a message is already pending for this session, we can
deliver it
+
+ while (true) {
+
+ if (closed) {
+ setListening(false);
+ return null;
+ }
+
+ if (!session.modeStop) {
+ Message mes=getMessage();
+ if (mes!=null) {
+ setListening(false);
+ return mes;
+ }
+ } else Log.log("the connection is stopped !");
+
+ long att=endTime-((new Date()).getTime());
+ if (att<=0) {
+ setListening(false);
+ return null;
+ }
+
+ try {
+ waitInReceive=true;
+ messages.wait(att);
+ } catch (InterruptedException e) {
+ } finally {
+ waitInReceive=false;
+ }
+
+ }
+ }
+
}
public Message receiveNoWait() throws JMSException
@@ -93,14 +161,17 @@
{
if (closed) throw new IllegalStateException("The MessageConsumer is
closed");
- if (listener==null) {
- sessionQueue.notifyReceiverStopped(false);
- } else {
- sessionQueue.notifyReceiverStopped(true);
- }
-
messageListener=listener;
-
+ setListening(listener!=null);
+ }
+
+ //---
+
+ void setListening(boolean newvalue)
+ {
+ if (newvalue==listening) return;
+ listening=newvalue;
+ ((SpyQueueSession)session).notifyReceiverStopped(this,listening);
}
}
1.2 +13 -13 spyderMQ/src/java/org/spyderMQ/SpyQueueSender.java
Index: SpyQueueSender.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSender.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyQueueSender.java 2000/05/15 02:08:58 1.1
+++ SpyQueueSender.java 2000/05/25 01:18:33 1.2
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyQueueSender
extends SpyMessageProducer
@@ -27,31 +27,31 @@
// Attributes ----------------------------------------------------
//The session to which this sender is linked
- private SpyQueueSession mySession;
+ private SpyQueueSession session;
//The queue of this sender
- private Queue myQueue=null;
+ private Queue queue=null;
// Constructor ---------------------------------------------------
- SpyQueueSender(SpyQueueSession s,Queue q)
+ SpyQueueSender(SpyQueueSession session,Queue queue)
{
- mySession=s;
- myQueue=q;
+ this.session=session;
+ this.queue=queue;
}
// Public --------------------------------------------------------
public Queue getQueue() throws JMSException
{
- return myQueue;
+ return queue;
}
//Send methods
public void send(Message message) throws JMSException
{
- if (myQueue==null) throw new InvalidDestinationException("I do not
have a default Destination !");
- send(myQueue,message,defaultDeliveryMode,defaultPriority,defaultTTL);
+ if (queue==null) throw new InvalidDestinationException("I do not have
a default Destination !");
+ send(queue,message,defaultDeliveryMode,defaultPriority,defaultTTL);
}
public void send(Queue queue, Message message) throws JMSException
@@ -61,8 +61,8 @@
public void send(Message message, int deliveryMode, int priority, long
timeToLive) throws JMSException
{
- if (myQueue==null) throw new InvalidDestinationException("I do not
have a default Destination !");
- send(myQueue,message,deliveryMode,priority,timeToLive);
+ if (queue==null) throw new InvalidDestinationException("I do not have
a default Destination !");
+ send(queue,message,deliveryMode,priority,timeToLive);
}
public void send(Queue queue, Message mes, int deliveryMode, int priority,
long timeToLive) throws JMSException
@@ -82,7 +82,7 @@
message.setJMSExpiration(timeToLive+ts.getTime());
}
message.setJMSPriority(priority);
- message.setJMSMessageID(mySession.getNewMessageID());
+ message.setJMSMessageID(session.getNewMessageID());
//Set the properties and the message body in ReadOnly mode
//the client has to call clearProperties() and clearBody() if he wants
to modify those values
@@ -92,6 +92,6 @@
message.setJMSRedelivered(false);
//We must put a 'new message' in the Session's outgoing queue [3.9]
- mySession.sendMessage(message.myClone());
+ session.sendMessage(message.myClone());
}
}
1.9 +5 -5 spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpyQueueSession.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- SpyQueueSession.java 2000/05/24 19:17:18 1.8
+++ SpyQueueSession.java 2000/05/25 01:18:34 1.9
@@ -23,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.8 $
+ * @version $Revision: 1.9 $
*/
public class SpyQueueSession
extends SpySession
@@ -66,7 +66,7 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue,modeStop);
+ SpyQueueReceiver receiver=new SpyQueueReceiver(this,queue);
SessionQueue sessionQueue=addConsumer(queue,receiver);
return receiver;
@@ -84,7 +84,6 @@
{
if (closed) throw new IllegalStateException("The session is closed");
- //Not implemented yet
return new SpyQueueSender(this,queue);
}
@@ -149,9 +148,10 @@
}
//One receiver is changing its mode
- synchronized void notifyReceiverStopped(SpyQueueReceiver receiver,boolean
newMode)
+ synchronized void notifyReceiverStopped(SpyQueueReceiver receiver,boolean mode)
{
- //if (newMode==false)
+ Log.log("Session:
notifyReceiverStopped(receiver="+receiver+",mode="+mode+")");
+ receiver.sessionQueue.changeNumListening(mode?1:-1);
}
}
1.17 +2 -2 spyderMQ/src/java/org/spyderMQ/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/SpySession.java,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- SpySession.java 2000/05/24 19:17:18 1.16
+++ SpySession.java 2000/05/25 01:18:34 1.17
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.16 $
+ * @version $Revision: 1.17 $
*/
public class SpySession
implements Runnable, Session
@@ -384,7 +384,7 @@
synchronized (destinations) {
SessionQueue sub=(SessionQueue)destinations.get(dest);
if (sub==null) {
- sub=new SessionQueue(transacted,acknowledgeMode);
+ sub=new SessionQueue(this);
sub.addConsumer(who);
HashMap newDestinations=(HashMap)destinations.clone();
newDestinations.put(dest,sub);
1.1 spyderMQ/src/java/org/spyderMQ/ConnectionQueue.java
Index: ConnectionQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq;
import javax.jms.JMSException;
import javax.jms.Destination;
import java.util.HashSet;
import java.util.Iterator;
/**
* This class is a useful class for the SpyConnection
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class ConnectionQueue
{
// Attributes ----------------------------------------------------
//My destination
Destination destination;
//the SpySessions linked to this queue
public HashSet subscribers;
//Number of listening sessions
int NumListeningSessions;
//My SpyConnection
SpyConnection connection;
// Constructor ---------------------------------------------------
ConnectionQueue(Destination destination,SpyConnection connection)
{
subscribers=new HashSet();
this.connection=connection;
this.destination=destination;
NumListeningSessions=0;
}
// Package protected ---------------------------------------------
synchronized void addSession(SpySession session)
{
HashSet newSet=(HashSet)subscribers.clone();
newSet.add(session);
subscribers=newSet;
}
synchronized boolean removeSession(SpySession session)
{
HashSet newSet=(HashSet)subscribers.clone();
newSet.remove(session);
subscribers=newSet;
return subscribers.size()==0;
}
}