User: norbert
Date: 00/05/30 15:10:20
Modified: src/java/org/spyderMQ/distributed/server
ConnectionReceiverRMIImpl.java
DistributedJMSServerRMI.java
DistributedJMSServerRMIImpl.java
Log:
The P2P system
Revision Changes Path
1.11 +49 -19
spyderMQ/src/java/org/spyderMQ/distributed/server/ConnectionReceiverRMIImpl.java
Index: ConnectionReceiverRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/distributed/server/ConnectionReceiverRMIImpl.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- ConnectionReceiverRMIImpl.java 2000/05/25 01:18:35 1.10
+++ ConnectionReceiverRMIImpl.java 2000/05/30 22:10:20 1.11
@@ -12,7 +12,10 @@
import org.spydermq.ConnectionQueue;
import org.spydermq.SpyMessage;
import org.spydermq.SpySession;
+import org.spydermq.SessionQueue;
import org.spydermq.SpyDestination;
+import org.spydermq.SpyTopicConnection;
+import org.spydermq.SpyQueueSession;
import org.spydermq.Log;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
@@ -25,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
*/
public class ConnectionReceiverRMIImpl extends UnicastRemoteObject implements
ConnectionReceiverRMI
{
@@ -58,27 +61,54 @@
Log.log("ConnectionReceiver:
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
- //Get the set of subscribers for this Destination
+ if (connection instanceof SpyTopicConnection) {
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) return;
+ //Get the set of subscribers for this Topic
+
+ ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
+ if (connectionQueue==null) return;
- Iterator i=connectionQueue.subscribers.iterator();
-
- while (i.hasNext()) {
-
- SpySession session=(SpySession)i.next();
-
- //add the new message to the session's queue
- session.dispatchMessage(dest,mes);
-
- //There is work to do...
- synchronized (session.thread) {
- //We should not have to wait for the lock...
- session.thread.notify();
- }
+ Iterator i=connectionQueue.subscribers.iterator();
+
+ while (i.hasNext()) {
+
+ SpySession session=(SpySession)i.next();
+
+ //add the new message to the session's queue
+ session.dispatchMessage(dest,mes);
+
+ //There is work to do...
+ synchronized (session.thread) {
+ //We should not have to wait for the lock...
+ session.thread.notify();
+ }
+ }
+
+ } else {
+
+ //Find one session waiting for this Queue
+
+ ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
+ if (connectionQueue==null) throw new JMSException("There is no
receiver for this queue !"); //We should catch this error in the JMSServerQueue
object
+ if (connectionQueue.NumListeningSessions==0) throw new
JMSException("There is no receiver for this queue !"); //We should catch this error in
the JMSServerQueue object
+
+ Iterator i=connectionQueue.subscribers.iterator();
+ SpySession session=null;
+ SessionQueue sq=null;
+ while (i.hasNext()) {
+ session=(SpySession)i.next();
+ sq=(SessionQueue)session.destinations.get(dest);
+ if (sq.NumListeningSubscribers!=0) break;
+ }
+ if (sq==null||sq.NumListeningSubscribers==0) {
+ connectionQueue.NumListeningSessions=0;
+ Log.log("WARNING: The listeners count was invalid !");
+ throw new JMSException("There is no receiver for this
queue !"); //We should catch this error in the JMSServerQueue object
+ }
+
+ sq.dispatchMessage(dest,mes);
+
}
-
}
public void receiveMultiple(Destination dest,SpyMessage mes[]) throws
JMSException
1.13 +3 -1
spyderMQ/src/java/org/spyderMQ/distributed/server/DistributedJMSServerRMI.java
Index: DistributedJMSServerRMI.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/distributed/server/DistributedJMSServerRMI.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- DistributedJMSServerRMI.java 2000/05/19 19:28:51 1.12
+++ DistributedJMSServerRMI.java 2000/05/30 22:10:20 1.13
@@ -24,7 +24,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote
{
@@ -42,4 +42,6 @@
public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException, RemoteException;
public void checkID(String ID) throws JMSException, RemoteException;
public SpyMessage queueReceiveNoWait(Queue queue) throws Exception,
RemoteException;
+ public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws Exception, RemoteException;
+
}
1.15 +7 -1
spyderMQ/src/java/org/spyderMQ/distributed/server/DistributedJMSServerRMIImpl.java
Index: DistributedJMSServerRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/distributed/server/DistributedJMSServerRMIImpl.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- DistributedJMSServerRMIImpl.java 2000/05/19 19:28:51 1.14
+++ DistributedJMSServerRMIImpl.java 2000/05/30 22:10:20 1.15
@@ -24,7 +24,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.14 $
+ * @version $Revision: 1.15 $
*/
public class DistributedJMSServerRMIImpl extends UnicastRemoteObject implements
DistributedJMSServerRMI
{
@@ -106,5 +106,11 @@
{
return server.queueReceiveNoWait(queue);
}
+
+ public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws JMSException
+ {
+ server.connectionListening(mode,dest,dc);
+ }
+
}