User: norbert 
  Date: 00/05/30 15:10:20

  Modified:    src/java/org/spyderMQ/distributed/server
                        ConnectionReceiverRMIImpl.java
                        DistributedJMSServerRMI.java
                        DistributedJMSServerRMIImpl.java
  Log:
  The P2P system
  
  Revision  Changes    Path
  1.11      +49 -19    
spyderMQ/src/java/org/spyderMQ/distributed/server/ConnectionReceiverRMIImpl.java
  
  Index: ConnectionReceiverRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/distributed/server/ConnectionReceiverRMIImpl.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- ConnectionReceiverRMIImpl.java    2000/05/25 01:18:35     1.10
  +++ ConnectionReceiverRMIImpl.java    2000/05/30 22:10:20     1.11
  @@ -12,7 +12,10 @@
   import org.spydermq.ConnectionQueue;
   import org.spydermq.SpyMessage;
   import org.spydermq.SpySession;
  +import org.spydermq.SessionQueue;
   import org.spydermq.SpyDestination;
  +import org.spydermq.SpyTopicConnection;
  +import org.spydermq.SpyQueueSession;
   import org.spydermq.Log;
   import java.rmi.RemoteException; 
   import java.rmi.server.UnicastRemoteObject;
  @@ -25,7 +28,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.10 $
  + *   @version $Revision: 1.11 $
    */
   public class ConnectionReceiverRMIImpl extends UnicastRemoteObject implements 
ConnectionReceiverRMI
   {
  @@ -58,27 +61,54 @@
                
                Log.log("ConnectionReceiver: 
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
                
  -             //Get the set of subscribers for this Destination
  +             if (connection instanceof SpyTopicConnection) {
                
  -             ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  -             if (connectionQueue==null) return;
  +                     //Get the set of subscribers for this Topic
  +             
  +                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  +                     if (connectionQueue==null) return;
        
  -             Iterator i=connectionQueue.subscribers.iterator();
  -                             
  -             while (i.hasNext()) {                                                  
         
  -                             
  -                     SpySession session=(SpySession)i.next();
  -                             
  -                     //add the new message to the session's queue
  -                     session.dispatchMessage(dest,mes);
  -                             
  -                     //There is work to do... 
  -                     synchronized (session.thread) {
  -                             //We should not have to wait for the lock...
  -                             session.thread.notify();
  -                     }                       
  +                     Iterator i=connectionQueue.subscribers.iterator();
  +                                     
  +                     while (i.hasNext()) {                                          
                 
  +                                     
  +                             SpySession session=(SpySession)i.next();
  +                                     
  +                             //add the new message to the session's queue
  +                             session.dispatchMessage(dest,mes);
  +                                     
  +                             //There is work to do... 
  +                             synchronized (session.thread) {
  +                                     //We should not have to wait for the lock...
  +                                     session.thread.notify();
  +                             }                       
  +                     }
  +                     
  +             } else {
  +                     
  +                     //Find one session waiting for this Queue
  +
  +                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  +                     if (connectionQueue==null) throw new JMSException("There is no 
receiver for this queue !");      //We should catch this error in the JMSServerQueue 
object
  +                     if (connectionQueue.NumListeningSessions==0) throw new 
JMSException("There is no receiver for this queue !"); //We should catch this error in 
the JMSServerQueue object
  +                     
  +                     Iterator i=connectionQueue.subscribers.iterator();
  +                     SpySession session=null;
  +                     SessionQueue sq=null;
  +                     while (i.hasNext()) {
  +                             session=(SpySession)i.next();
  +                             sq=(SessionQueue)session.destinations.get(dest);
  +                             if (sq.NumListeningSubscribers!=0) break;
  +                     }
  +                     if (sq==null||sq.NumListeningSubscribers==0) {
  +                             connectionQueue.NumListeningSessions=0;
  +                             Log.log("WARNING: The listeners count was invalid !");
  +                             throw new JMSException("There is no receiver for this 
queue !"); //We should catch this error in the JMSServerQueue object
  +                     }
  +
  +                     sq.dispatchMessage(dest,mes);
  +                     
                }
  -             
        } 
   
        public void receiveMultiple(Destination dest,SpyMessage mes[]) throws 
JMSException
  
  
  
  1.13      +3 -1      
spyderMQ/src/java/org/spyderMQ/distributed/server/DistributedJMSServerRMI.java
  
  Index: DistributedJMSServerRMI.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/distributed/server/DistributedJMSServerRMI.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- DistributedJMSServerRMI.java      2000/05/19 19:28:51     1.12
  +++ DistributedJMSServerRMI.java      2000/05/30 22:10:20     1.13
  @@ -24,7 +24,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote
   {  
  @@ -42,4 +42,6 @@
        public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException, RemoteException;
        public void checkID(String ID) throws JMSException, RemoteException;
        public SpyMessage queueReceiveNoWait(Queue queue) throws Exception, 
RemoteException;
  +     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws Exception, RemoteException;
  +
   }
  
  
  
  1.15      +7 -1      
spyderMQ/src/java/org/spyderMQ/distributed/server/DistributedJMSServerRMIImpl.java
  
  Index: DistributedJMSServerRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spyderMQ/distributed/server/DistributedJMSServerRMIImpl.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- DistributedJMSServerRMIImpl.java  2000/05/19 19:28:51     1.14
  +++ DistributedJMSServerRMIImpl.java  2000/05/30 22:10:20     1.15
  @@ -24,7 +24,7 @@
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.14 $
  + *   @version $Revision: 1.15 $
    */
   public class DistributedJMSServerRMIImpl extends UnicastRemoteObject implements 
DistributedJMSServerRMI
   { 
  @@ -106,5 +106,11 @@
        {
                return server.queueReceiveNoWait(queue);
        }
  +     
  +     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws JMSException
  +     {
  +             server.connectionListening(mode,dest,dc);
  +     }
  +
   
   }
  
  
  

Reply via email to