User: hiram   
  Date: 00/11/19 12:00:04

  Modified:    src/java/org/spydermq/distributed/server
                        ConnectionReceiverOIL.java
                        ConnectionReceiverOILClient.java
                        ConnectionReceiverRMIImpl.java
                        DistributedJMSServerOIL.java
                        DistributedJMSServerOILClient.java
                        DistributedJMSServerRMI.java
                        DistributedJMSServerRMIImpl.java
                        ConnectionReceiverUIL.java
                        ConnectionReceiverUILClient.java
                        DistributedJMSServerUIL.java
                        DistributedJMSServerUILClient.java
                        DistributedJMSServerUILMBean.java
  Log:
  Commiting several changes:
   - Removed ConnectionQueue and SessionQueue.  All consumer managment is done at the 
SpyConnection now.
   - Unacknowledged messages are maintained on the server side now. 
(JMSServerQueueReceiver)
   - Acknowlegment messages are sent to the server from the client.
   - Optimized the OIL and UIL transports by caching the DistributedConnection on the 
JMSServer when the ConnectionReciver is setup.
   - Cleaned up the OIL by only using the Object(Output/Input) streams instead of both 
Object(Output/Input) and Buffered(Output/Input) streams.
   - A QueueReceiver now does a request for a single message on a receive() method 
instead turning on/off listen to get a message.
   - For the OIL and UIL, a connection failure/termination is now handled gracefully  
(if connectionCLosing() was called, no errors shown, if it was not called and the 
connection failed, we call it).
  
  Revision  Changes    Path
  1.8       +67 -134   
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
  
  Index: ConnectionReceiverOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
  retrieving revision 1.7
  retrieving revision 1.8
  diff -u -r1.7 -r1.8
  --- ConnectionReceiverOIL.java        2000/11/14 06:03:19     1.7
  +++ ConnectionReceiverOIL.java        2000/11/19 20:00:01     1.8
  @@ -9,10 +9,10 @@
   import javax.jms.Destination;
   import javax.jms.JMSException;
   import org.spydermq.SpyConnection;
  -import org.spydermq.ConnectionQueue;
  +
   import org.spydermq.SpyMessage;
   import org.spydermq.SpySession;
  -import org.spydermq.SessionQueue;
  +
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyTopicConnection;
   import org.spydermq.SpyQueueSession;
  @@ -35,13 +35,7 @@
   import java.io.ObjectOutputStream;
   import java.io.IOException;
   
  -/**
  - *   The OIL implementation of the ConnectionReceiver object
  - *      
  - *   @author Norbert Lataille ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.7 $
  - */
  +import org.spydermq.SpyMessageConsumer;
   public class ConnectionReceiverOIL 
        implements Runnable, ConnectionReceiverSetup
   {
  @@ -83,27 +77,15 @@
        {
                Socket socket = null;
                int code = 0;
  -             //InputStream is=null;
  -             //OutputStream os=null;
  -             BufferedInputStream is=null;
  -             BufferedOutputStream os=null;
                ObjectOutputStream out=null;
                ObjectInputStream in=null;
                      
                try {
  +                     
                        socket = serverSocket.accept();
  -               
  -                     //We have our connection to the broker... there's no need to 
wait for another connection
  -                     //new Thread(this).start();
  -
  -                     //is = socket.getInputStream();
  -                     //os = socket.getOutputStream();
  -                     is = new BufferedInputStream(socket.getInputStream());
  -                     os = new BufferedOutputStream(socket.getOutputStream());
  -
  -                     out = new ObjectOutputStream(os);
  +                     out = new ObjectOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
                        out.flush();
  -                     in = new ObjectInputStream(is);
  +                     in = new ObjectInputStream(new 
BufferedInputStream(socket.getInputStream()));
   
                } catch (IOException e) {
                        failure("Initialisation",e);
  @@ -113,7 +95,7 @@
                while (true) {
   
                        try {
  -                             code=is.read();
  +                             code=in.readByte();
                        } catch (IOException e) {
                                failure("Command read",e);
                                e.printStackTrace();
  @@ -146,24 +128,24 @@
                                //Everthing was OK
                                
                                try {
  -                                     os.write(0);
  -                                     os.flush();
  +                                     out.writeByte(0);
  +                                     out.flush();
                                } catch (IOException e) {
                                        failure("Result write",e);
                                        return;                                 
                                }
                                
  -                             } catch (Exception e) {
  +                     } catch (Exception e) {
   
                                try {
                                        if( e instanceof NoReceiverException ) {
  -                                             os.write(2);
  +                                             out.writeByte(2);
                                        } else {
  -                                             os.write(1);
  +                                             out.writeByte(1);
                                        }
                                        out.writeObject(e.getMessage());
  +                                     out.flush();
                                        out.flush();
  -                                     os.flush();
                                } catch (IOException e2) {
                                        failure("Result write",e2);
                                        return;                                 
  @@ -193,140 +175,91 @@
        }
        
        // ---
  -     
  +
        //<DEBUG>
  -     
  +
        /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
        {
                connection.rec++;
        }
  -     
  +
        public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
        {
                connection.rec++;
        }*/
  -     
  +
        //</DEBUG>
  -     
  -     
  +
        //A message has arrived for this Connection, We have to dispatch it to the 
sessions
  -    public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  -     {               
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             
  -             Log.log("ConnectionReceiver: 
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
  -             
  +     public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +
  +             Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() + 
",Mes=" + mes.toString() + ")");
  +
                if (connection instanceof SpyTopicConnection) {
  -             
  +
                        //Get the set of subscribers for this Topic
  -             
  -                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  -                     if (connectionQueue==null) return;
  -     
  -                     Iterator i=connectionQueue.subscribers.iterator();
  -                                     
  -                     while (i.hasNext()) {                                          
                 
  -                                     
  -                             SpySession session=(SpySession)i.next();
  -                                     
  -                             //add the new message to the session's queue
  -                             session.dispatchMessage(dest,mes);
  -                                     
  +                     SpyMessageConsumer consumers[] = connection.getConsumers(dest);
  +
  +                     for (int i = 0; i < consumers.length; i++) {
  +
  +                             //add the new message to the consumer's queue
  +                             consumers[i].addMessage(mes);
  +
                                //There is work to do... 
  -                             session.mutex.notifyLock();
  +                             consumers[i].session.mutex.notifyLock();
                        }
  +
                } else {
  -                     
  -                     while (true) {
  -                             
  -                             SessionQueue sq=null;
   
  -                             try {
  -                     
  -                                     //Find one session waiting for this Queue
  -                                     if (connection.modeStop) throw new 
Exception("This connection is stopped !");
  -                                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
  -                                     if (connectionQueue==null) throw new 
Exception("There is no connectionQueue for this destination !");
  -                                     
  -                                     synchronized (connectionQueue) {
  -                                             
  -                                             //Find a SessionQueue
  -                                             if 
(connectionQueue.NumListeningSessions==0) throw new NoReceiverException("There are no 
listening sessions for this destination !");
  -                     
  -                                             Iterator 
i=connectionQueue.subscribers.iterator();
  -                                             while (i.hasNext()) {
  -                                                     SpySession 
session=(SpySession)i.next();
  -                                                     
sq=(SessionQueue)session.destinations.get(dest);
  -                                                     if 
(sq.NumListeningSubscribers!=0) break;
  -                                             }
  -                                             if 
(sq==null||sq.NumListeningSubscribers==0) {
  -                                                     Log.error("FIXME: The 
listeners count was invalid !");
  -                                                     throw new 
NoReceiverException("There are no listening sessions for this destination !");
  -                                             }
  -                                     
  -                                             //Try with this sessionQueue
  -                                             Log.log("Dispatching to SessionQueue: 
"+mes);
  -                                             sq.dispatchMessage(dest,mes);
  -                             
  -                                             //Our work is done here
  -                                             break;
  -                                     }
  +                     //Find one session waiting for this Queue
  +                     if (connection.modeStop)
  +                             throw new JMSException("This connection is stopped !");
  +
  +                     SpyMessageConsumer consumer = 
connection.pickListeningConsumer(dest);
  +                     if (consumer == null)
  +                             throw new NoReceiverException("There are no listening 
sessions for this destination !");
  +
  +                     //Try with this sessionQueue
  +                     Log.log("Dispatching to SessionQueue: " + mes);
  +                     ((org.spydermq.SpyQueueReceiver)consumer).dispatchMessage( mes 
);
   
  -                             } catch (NoReceiverException e) {
  -                                     //This SessionQueue should not have been 
registered !
  -                                     throw e;
  -                             } catch (Exception e) {
  -                                     //This error is non-recoverable : we must 
unregister from this queue
  -                                     //Let the JMSServerQueue do its work
  -                                     Log.log(e);
  -                                     throw new JMSException("There are no listening 
sessions in this connection");
  -                             }
  -                     }
  -                     
                }
  -                     
  -     } 
   
  -    public void receiveMultiple(SpyDestination dest,int nb,ObjectInputStream in) 
throws Exception
  -     {               
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             
  +     }
  + 
  +
  +     public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in) 
throws Exception {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +
                Log.log("ConnectionReceiver: ReceiveMultiple()");
  -             
  +
                if (connection instanceof SpyTopicConnection) {
  -             
  +
                        //Get the set of subscribers for this Topic
  -             
  -                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  -                     if (connectionQueue==null) return;
  -     
  -                     for(int val=0;val<nb;val++) {
  -                     
  -                             SpyMessage mes=(SpyMessage)in.readObject();
  -                             
  -                             //NL: i is a short-lived object. Try to "group" 
messages in an pre-allocated/peristant
  -                             //array and apply the same iterator on this array
  -                             Iterator i=connectionQueue.subscribers.iterator();
  -                                     
  -                             while (i.hasNext()) {                                  
                         
  -                             
  -                                     SpySession session=(SpySession)i.next();
  -                                     
  -                                     //add the new message to the session's queue
  -                                     session.dispatchMessage(dest,mes);
  -                             
  +                     SpyMessageConsumer consumers[] = connection.getConsumers(dest);
  +
  +                     for (int val = 0; val < nb; val++) {
  +                             SpyMessage mes = (SpyMessage) in.readObject();
  +
  +                             for (int i = 0; i < consumers.length; i++) {
  +
  +                                     //add the new message to the consumer's queue
  +                                     consumers[i].addMessage(mes);
  +
                                        //There is work to do... 
  -                                     session.mutex.notifyLock();
  +                                     consumers[i].session.mutex.notifyLock();
                                }
                        }
  -                     
                } else {
  -                     throw new Exception("Multiple dispatch for a Queue");          
         
  +                     throw new Exception("Multiple dispatch for a Queue");
                }
        } 
        
        
  -    public void close() throws Exception
  +     public void close() throws Exception
        {
                closed=true;            
        }
  
  
  
  1.10      +20 -22    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
  
  Index: ConnectionReceiverOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- ConnectionReceiverOILClient.java  2000/11/14 06:05:05     1.9
  +++ ConnectionReceiverOILClient.java  2000/11/19 20:00:01     1.10
  @@ -27,8 +27,8 @@
        static final int CLOSE = 4;             
        
        private transient Socket socket;
  -     private transient BufferedInputStream is;
  -     private transient BufferedOutputStream os;
  +
  +
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
        
  @@ -46,11 +46,9 @@
        {
                try {                   
                        socket=new Socket(addr,port);
  -                     is = new BufferedInputStream(socket.getInputStream());
  -                     os = new BufferedOutputStream(socket.getOutputStream());
  -                     in=new ObjectInputStream(is);
  -                     out=new ObjectOutputStream(os);
  -                     os.flush();
  +                     out=new ObjectOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
  +                     out.flush();
  +                     in=new ObjectInputStream(new 
BufferedInputStream(socket.getInputStream()));
                } catch (Exception e) {                 
                        Log.error(e);
                        throw new RemoteException("Cannot connect to the 
ConnectionReceiver/Server");
  @@ -61,8 +59,8 @@
        {
                Exception throwException=null;
                try {
  -                     os.flush();
  -                     int val=is.read();
  +                     out.flush();
  +                     int val=in.readByte();
                        switch(val) {
                        case 1:                         
                                String st=(String)in.readObject();
  @@ -81,20 +79,20 @@
                if( throwException != null )
                        throw throwException;
        }
  -
  -    public void receive(SpyDestination dest,SpyMessage mes) throws Exception
  +     
  +     public void receive(SpyDestination dest,SpyMessage mes) throws Exception
        {               
  -        if (socket==null) createConnection();
  -        os.write(RECEIVE);
  -        out.writeObject(dest);
  -        out.writeObject(mes);
  -        waitAnswer();
  +             if (socket==null) createConnection();
  +             out.writeByte(RECEIVE);
  +             out.writeObject(dest);
  +             out.writeObject(mes);
  +             waitAnswer();
        }
                
  -    public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
Exception       
  +     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
Exception      
        {
                if (socket==null) createConnection();
  -             os.write(RECEIVE_MULTIPLE);
  +             out.writeByte(RECEIVE_MULTIPLE);
                out.writeObject(dest);
                out.writeInt(mes.length);
                for(int i=0;i<mes.length;i++)
  @@ -102,18 +100,18 @@
                waitAnswer();
        }       
        
  -    public void deleteTemporaryDestination(SpyDestination dest) throws Exception
  +     public void deleteTemporaryDestination(SpyDestination dest) throws Exception
        {
                if (socket==null) createConnection();
  -             os.write(DELETE_TEMPORARY_DESTINATION);
  +             out.writeByte(DELETE_TEMPORARY_DESTINATION);
                out.writeObject(dest);
                waitAnswer();
        }       
  -    
  +     
        public void close() throws Exception
        {
                if (socket==null) createConnection();
  -             os.write(CLOSE);
  +             out.writeByte(CLOSE);
                waitAnswer();
        }
        
  
  
  
  1.12      +57 -90    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
  
  Index: ConnectionReceiverRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- ConnectionReceiverRMIImpl.java    2000/06/19 21:52:00     1.11
  +++ ConnectionReceiverRMIImpl.java    2000/11/19 20:00:01     1.12
  @@ -9,10 +9,10 @@
   import javax.jms.Destination;
   import javax.jms.JMSException;
   import org.spydermq.SpyConnection;
  -import org.spydermq.ConnectionQueue;
  +
   import org.spydermq.SpyMessage;
   import org.spydermq.SpySession;
  -import org.spydermq.SessionQueue;
  +
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyTopicConnection;
   import org.spydermq.SpyQueueSession;
  @@ -26,12 +26,12 @@
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
   
  -/**
  +import org.spydermq.SpyMessageConsumer;/**
    *   The RMI implementation of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.11 $
  + *   @version $Revision: 1.12 $
    */
   public class ConnectionReceiverRMIImpl 
        extends UnicastRemoteObject 
  @@ -58,107 +58,74 @@
        {
                this.connection=connection;
        }
  -     
  -     //<DEBUG>
        
  -     /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  -     {
  -             connection.rec++;       
  -     }
  +     //A message has arrived for this Connection, We have to dispatch it to the 
sessions
  +     public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
   
  -     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
  -     {
  -             connection.rec++;
  -     }*/
  -             
  -     //</DEBUG>
  +             Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() + 
",Mes=" + mes.toString() + ")");
   
  -     
  -     //A message has arrived for this Connection, We have to dispatch it to the 
sessions
  -    public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  -     {
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             
  -             Log.log("ConnectionReceiver: 
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
  -             
                if (connection instanceof SpyTopicConnection) {
  -             
  +
                        //Get the set of subscribers for this Topic
  -             
  -                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  -                     if (connectionQueue==null) return;
  -     
  -                     Iterator i=connectionQueue.subscribers.iterator();
  -                                     
  -                     while (i.hasNext()) {                                          
                 
  -                                     
  -                             SpySession session=(SpySession)i.next();
  -                                     
  -                             //add the new message to the session's queue
  -                             session.dispatchMessage(dest,mes);
  -                                     
  +                     SpyMessageConsumer consumers[] = connection.getConsumers(dest);
  +
  +                     for (int i = 0; i < consumers.length; i++) {
  +
  +                             //add the new message to the consumer's queue
  +                             consumers[i].addMessage(mes);
  +
                                //There is work to do... 
  -                             session.mutex.notifyLock();
  +                             consumers[i].session.mutex.notifyLock();
                        }
  -                     
  +
                } else {
  -                     
  -                     while (true) {
  -                             
  -                             SessionQueue sq=null;
  -
  -                             try {
  -                     
  -                                     //Find one session waiting for this Queue
  -                                     if (connection.modeStop) throw new 
Exception("This connection is stopped !");
  -                                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
  -                                     if (connectionQueue==null) throw new 
Exception("There is no connectionQueue for this destination !");
  -                                     
  -                                     synchronized (connectionQueue) {
  -                                             
  -                                             //Find a SessionQueue
  -                                             if 
(connectionQueue.NumListeningSessions==0) throw new Exception("There are no listening 
sessions for this destination !");
  -                     
  -                                             Iterator 
i=connectionQueue.subscribers.iterator();
  -                                             while (i.hasNext()) {
  -                                                     SpySession 
session=(SpySession)i.next();
  -                                                     
sq=(SessionQueue)session.destinations.get(dest);
  -                                                     if 
(sq.NumListeningSubscribers!=0) break;
  -                                             }
  -                                             if 
(sq==null||sq.NumListeningSubscribers==0) {
  -                                                     Log.error("FIXME: The 
listeners count was invalid !");
  -                                                     throw new Exception("There are 
no listening sessions for this destination !");
  -                                             }
  -                                     
  -                                             //Try with this sessionQueue
  -                                             sq.dispatchMessage(dest,mes);
  -                             
  -                                             //Our work is done here
  -                                             break;
  -                                     }
  -
  -                             } catch (NoReceiverException e) {
  -                                     //This SessionQueue should not have been 
registered !
  -                                     continue;
  -                             } catch (Exception e) {
  -                                     //This error is non-recoverable : we must 
unregister from this queue
  -                                     //Let the JMSServerQueue do its work
  -                                     Log.log(e);
  -                                     throw new NoReceiverException("There are no 
listening sessions in this connection");
  -                             }
  -                     }
  -                     
  +
  +                     //Find one session waiting for this Queue
  +                     if (connection.modeStop)
  +                             throw new JMSException("This connection is stopped !");
  +
  +                     SpyMessageConsumer consumer = 
connection.pickListeningConsumer(dest);
  +                     if (consumer == null)
  +                             throw new NoReceiverException("There are no listening 
sessions for this destination !");
  +
  +                     //Try with this sessionQueue
  +                     Log.log("Dispatching to SessionQueue: " + mes);
  +                     ((org.spydermq.SpyQueueReceiver) 
consumer).dispatchMessage(mes);
  +
                }
        } 
   
        public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
        {
  -             for(int i=0;i<mes.length;i++) {
  -                     receive(dest,mes[i]);
  -             }
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +
  +             Log.log("ConnectionReceiver: ReceiveMultiple()");
  +
  +             if (connection instanceof SpyTopicConnection) {
  +
  +                     //Get the set of subscribers for this Topic
  +                     SpyMessageConsumer consumers[] = connection.getConsumers(dest);
  +
  +                     for(int i=0;i<mes.length;i++) {
  +
  +                             for (int j = 0; j < consumers.length; j++) {
  +
  +                                     //add the new message to the consumer's queue
  +                                     consumers[j].addMessage(mes[i]);
  +
  +                                     //There is work to do... 
  +                                     consumers[j].session.mutex.notifyLock();
  +                             }
  +                     }
  +             } else {
  +                     throw new JMSException("Multiple dispatch for a Queue");
  +             }               
        }
        
  -    public void close() throws Exception
  +     public void close() throws Exception
        {
                closed=true;            
        }
  
  
  
  1.5       +61 -46    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
  
  Index: DistributedJMSServerOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- DistributedJMSServerOIL.java      2000/11/04 19:24:47     1.4
  +++ DistributedJMSServerOIL.java      2000/11/19 20:00:01     1.5
  @@ -24,7 +24,7 @@
   import java.io.BufferedInputStream;
   import java.io.IOException;
   
  -public class DistributedJMSServerOIL
  +import org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerOIL
        implements Runnable, DistributedJMSServerSetup, DistributedJMSServerOILMBean
   {
   
  @@ -53,8 +53,10 @@
        static final int ConnectionClosing = 9;
        static final int DeleteTemporaryDestination = 10;
        static final int CheckID = 11;
  -     static final int QueueReceiveNoWait = 12;
  +     static final int QueueReceive = 12;     
        static final int ConnectionListening = 13;
  +     static final int Acknowledge = 14;      
  +     static final int SetSpyDistributedConnection = 15;
        
        private ServerSocket serverSocket;
   
  @@ -72,35 +74,35 @@
        {
                Socket socket = null;
                int code = 0;
  -             BufferedInputStream is=null;
  -             BufferedOutputStream os=null;
                ObjectOutputStream out=null;
                ObjectInputStream in=null;
  +             SpyDistributedConnection spyDistributedConnection=null;
  +             boolean closed = false;
                      
                try {
                        socket = serverSocket.accept();
                  
                        new Thread(this).start();
  -
  -                     is = new BufferedInputStream(socket.getInputStream());
  -                     os = new BufferedOutputStream(socket.getOutputStream());
  -                     out = new ObjectOutputStream(os);
  -                     os.flush();
  -                     in = new ObjectInputStream(is);
  +                     
  +                     out = new ObjectOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
  +                     out.flush();
  +                     in = new ObjectInputStream(new 
BufferedInputStream(socket.getInputStream()));
   
                } catch (IOException e) {
                        failure("Initialisation",e);
                        return;
                }
   
  -             while (true) {
  +             while (!closed) {
   
                        try {
  -                             code=is.read();         
  +                             code=in.readByte();             
                        } catch (IOException e) {
  +                             if( closed )
  +                                     break;
                                Log.notice("Command read");
  -                Log.notice(e);
  -                             return;
  +                             Log.notice(e);
  +                             break;
                        }
                
                        try {
  @@ -115,11 +117,11 @@
                                        case NewMessage:                               
                 
                                                
newMessage((String)in.readObject(),in.readInt(),in);
                                                break;
  -                                     case Subscribe: 
  -                                             
server.subscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
  +                                     case Subscribe:
  +                                             
server.subscribe((Destination)in.readObject(),spyDistributedConnection);
                                                break;
                                        case Unsubscribe: 
  -                                             
server.unsubscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
  +                                             
server.unsubscribe((Destination)in.readObject(),spyDistributedConnection);
                                                break;
                                        case CreateTopic: 
                                                
result=(Topic)server.createTopic((String)in.readObject());
  @@ -128,13 +130,14 @@
                                                
result=(Queue)server.createQueue((String)in.readObject());
                                                break;
                                        case GetTemporaryTopic:
  -                                             
result=(TemporaryTopic)server.getTemporaryTopic((SpyDistributedConnection)in.readObject());
  +                                             
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
                                                break;
                                        case GetTemporaryQueue: 
  -                                             
result=(TemporaryQueue)server.getTemporaryQueue((SpyDistributedConnection)in.readObject());
  +                                             
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
                                                break;
                                        case ConnectionClosing: 
  -                                             
server.connectionClosing((SpyDistributedConnection)in.readObject(),null);
  +                                             
server.connectionClosing(spyDistributedConnection,null);
  +                                             closed=true;
                                                break;
                                        case DeleteTemporaryDestination: 
                                                
server.deleteTemporaryDestination((SpyDestination)in.readObject());
  @@ -142,11 +145,20 @@
                                        case CheckID: 
                                                
server.checkID((String)in.readObject());
                                                break;
  -                                     case QueueReceiveNoWait:
  -                                             
result=server.queueReceiveNoWait((Queue)in.readObject());
  +                                     case QueueReceive:
  +                                             
result=server.queueReceive((Queue)in.readObject(), 
in.readLong(),spyDistributedConnection);
                                                break;
                                        case ConnectionListening: 
  -                                             
server.connectionListening(((is.read())==1),(Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
  +                                             
server.connectionListening(in.readBoolean(),(Destination)in.readObject(),spyDistributedConnection);
  +                                             break;
  +                                     case Acknowledge:
  +                                             SpyAcknowledgementItem items[] = new 
SpyAcknowledgementItem[in.readInt()];
  +                                             for( int i=0; i < items.length; i++ )
  +                                                     items[i] = 
(SpyAcknowledgementItem)in.readObject();
  +                                             server.acknowledge(items, 
in.readBoolean(),spyDistributedConnection);
  +                                             break;
  +                                     case SetSpyDistributedConnection: 
  +                                             spyDistributedConnection = 
(SpyDistributedConnection)in.readObject();
                                                break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
  @@ -155,31 +167,45 @@
                                //Everthing was OK
                                
                                try {
  -                                     if (result==null) os.write(0);
  +                                     if (result==null) 
  +                                             out.writeByte(0);
                                        else {
  -                                             os.write(1);
  +                                             out.writeByte(1);
                                                out.writeObject(result);
                                        }
  -                                     os.flush();
  +                                     out.flush();
                                } catch (IOException e) {
  +                                     if( closed )
  +                                             break;
                                        failure("Result write",e);
  -                                     return;                                 
  +                                     break;                                  
                                }
                                
                        } catch (Exception e) {
  +                             if( closed )
  +                                     break;
   
                                try {
  -                                     os.write(2);
  +                                     out.writeByte(2);
                                        out.writeObject(e.getMessage());
  -                                     os.flush();
  +                                     out.flush();
                                } catch (IOException e2) {
                                        failure("Result write",e2);
  -                                     return;                                 
  +                                     break;                                  
                                }
                                
                        }
   
                }
  +             
  +             try {
  +                     if( !closed )
  +                             
server.connectionClosing(spyDistributedConnection,null);
  +                     socket.close();
  +             } catch (IOException e ) {
  +                     Log.log("Could not gracefully close the connection with the 
client");
  +                     Log.log(e);
  +             }
        }
        
        void failure(String st,Exception e)
  @@ -190,23 +216,11 @@
        
        void newMessage(String id,int nb,ObjectInputStream in) throws Exception
        {
  -             Log.notice("INCOMING: "+nb+" messages from "+id);
  +             SpyMessage mes[] = new SpyMessage[nb];
  +             for(int i=0;i<nb;i++) 
  +                     mes[i]=(SpyMessage)in.readObject();                     
                
  -             SpyDestination dest=null;
  -             JMSServerQueue queue=null;
  -             
  -             for(int i=0;i<nb;i++) { 
  -                     
  -                     SpyMessage mes=(SpyMessage)in.readObject();
  -                     
  -                     if (dest==null||!dest.equals(mes.jmsDestination)) {
  -                             
queue=(JMSServerQueue)server.messageQueue.get(mes.jmsDestination);
  -                             if (queue==null) throw new JMSException("This 
destination does not exist !"); //hum...
  -                     }
  -             
  -                     //Add the message to the queue          
  -                     queue.addMessage(mes);
  -             }
  +             server.newMessage(mes, id);
        }
        
        // --
  @@ -221,4 +235,5 @@
                server=s;
        }
        
  +
   }
  
  
  
  1.3       +78 -39    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
  
  Index: DistributedJMSServerOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- DistributedJMSServerOILClient.java        2000/06/20 02:19:13     1.2
  +++ DistributedJMSServerOILClient.java        2000/11/19 20:00:01     1.3
  @@ -22,7 +22,7 @@
   import java.net.InetAddress;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   
  -public class DistributedJMSServerOILClient
  +import org.spydermq.SpyAcknowledgementItem;public class 
DistributedJMSServerOILClient
        implements DistributedJMSServer, Serializable
   {
        
  @@ -37,14 +37,16 @@
        static final int ConnectionClosing = 9;
        static final int DeleteTemporaryDestination = 10;
        static final int CheckID = 11;
  -     static final int QueueReceiveNoWait = 12;
  +     static final int QueueReceive = 12;     
        static final int ConnectionListening = 13;
  +     static final int Acknowledge = 14;      
  +     static final int SetSpyDistributedConnection = 15;      
        
        //Remote stuff
        
        private transient Socket socket;
  -     private transient BufferedInputStream is;
  -     private transient BufferedOutputStream os;
  +
  +
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
        
  @@ -62,11 +64,9 @@
        {
                try {                   
                        socket=new Socket(addr,port);
  -                     is=new BufferedInputStream(socket.getInputStream());
  -                     os=new BufferedOutputStream(socket.getOutputStream());
  -                     in=new ObjectInputStream(is);
  -                     out=new ObjectOutputStream(os);
  -                     os.flush();
  +                     in=new ObjectInputStream(new 
BufferedInputStream(socket.getInputStream()));
  +                     out=new ObjectOutputStream(new 
BufferedOutputStream(socket.getOutputStream()));
  +                     out.flush();
                } catch (Exception e) {
                        failure(e);
                }
  @@ -75,8 +75,8 @@
        public Object waitAnswer() throws RemoteException
        {
                try {
  -                     os.flush();
  -                     int val=is.read();
  +                     out.flush();
  +                     int val=in.readByte();
                        if (val==0) return null;
                        if (val==1) {
                                return in.readObject();
  @@ -99,12 +99,12 @@
        
        //--- Remote Calls
        
  -    public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException
  +     public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     os.write(NewMessage);
  +                     out.writeByte(NewMessage);
                        out.writeObject(id);
                        out.writeInt(val.length);
                        for(int i=0;i<val.length;i++)
  @@ -121,7 +121,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     os.write(GetID);
  +                     out.writeByte(GetID);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -129,14 +129,13 @@
                return (String)waitAnswer();
        }
                
  -    public void subscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException
  +     public void subscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     os.write(Subscribe);
  +                     out.writeByte(Subscribe);
                        out.writeObject(dest);
  -                     out.writeObject(who);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -144,14 +143,13 @@
                waitAnswer();
        }
        
  -     public void unsubscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException
  +     public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     os.write(Unsubscribe);
  +                     out.writeByte(Unsubscribe);
                        out.writeObject(dest);
  -                     out.writeObject(who);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -164,7 +162,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     os.write(CreateTopic);
  +                     out.writeByte(CreateTopic);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  @@ -178,7 +176,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     os.write(CreateQueue);
  +                     out.writeByte(CreateQueue);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  @@ -192,8 +190,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     os.write(GetTemporaryTopic);
  -                     out.writeObject(dc);
  +                     out.writeByte(GetTemporaryTopic);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -206,8 +203,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     os.write(GetTemporaryQueue);
  -                     out.writeObject(dc);
  +                     out.writeByte(GetTemporaryQueue);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -220,8 +216,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     os.write(ConnectionClosing);
  -                     out.writeObject(dc);
  +                     out.writeByte(ConnectionClosing);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -234,7 +229,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     os.write(DeleteTemporaryDestination);
  +                     out.writeByte(DeleteTemporaryDestination);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  @@ -248,7 +243,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     os.write(CheckID);
  +                     out.writeByte(CheckID);
                        out.writeObject(ID);
                } catch (IOException e) {
                        failure(e);
  @@ -256,35 +251,79 @@
                
                waitAnswer();
        }
  +     
        
  -     public SpyMessage queueReceiveNoWait(Queue queue) throws Exception, 
RemoteException
  +     public void connectionListening(boolean mode,Destination dest, 
SpyDistributedConnection dc) throws Exception, RemoteException
        {
                if (socket==null) createConnection();
                
  +             try {
  +                     out.writeByte(ConnectionListening);
  +                     out.writeBoolean(mode);
  +                     out.writeObject(dest);
  +             } catch (IOException e) {
  +                     failure(e);
  +             }
  +             
  +             waitAnswer();
  +     }
  +     
  +     public void acknowledge(SpyAcknowledgementItem item[],boolean 
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException {
  +             if (socket==null) createConnection();
  +             
                try {
  -                     os.write(QueueReceiveNoWait);
  +                     out.writeByte(Acknowledge);
  +                     out.writeInt(item.length);
  +                     for( int i=0; i< item.length; i++ )
  +                             out.writeObject(item[i]);
  +                     out.writeBoolean(isAck);
  +             } catch (IOException e) {
  +                     failure(e);
  +             }
  +             
  +             waitAnswer();
  +     }       
  +     
  +     public SpyMessage queueReceive(Queue queue, long wait) throws Exception, 
RemoteException
  +     {
  +             if (socket==null) createConnection();
  +             
  +             try {
  +                     out.writeByte(QueueReceive);
                        out.writeObject(queue);
  +                     out.writeLong(wait);
                } catch (IOException e) {
                        failure(e);
                }
                
                return (SpyMessage)waitAnswer();
  -     }
  +     }       
        
  -     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws Exception, RemoteException
  +     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws Exception, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     os.write(ConnectionListening);
  -                     if (mode) os.write(1);
  -                     else os.write(0);                                              
  
  -                     out.writeObject(dest);
  -                     out.writeObject(dc);
  +                     out.writeByte(QueueReceive);
  +                     out.writeObject(queue);
  +                     out.writeLong(wait);
                } catch (IOException e) {
                        failure(e);
                }
                
  +             return (SpyMessage)waitAnswer();
  +     }
  +     
  +     
  +     public void setSpyDistributedConnection(SpyDistributedConnection dest) throws 
RemoteException {
  +             if (socket==null) createConnection();
  +             
  +             try {
  +                     out.writeByte( SetSpyDistributedConnection );
  +                     out.writeObject(dest);
  +             } catch (IOException e) {
  +                     failure(e);
  +             }               
                waitAnswer();
        }
        
  
  
  
  1.2       +15 -12    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java
  
  Index: DistributedJMSServerRMI.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- DistributedJMSServerRMI.java      2000/05/31 18:10:16     1.1
  +++ DistributedJMSServerRMI.java      2000/11/19 20:00:01     1.2
  @@ -12,36 +12,39 @@
   import javax.jms.Queue;
   import javax.jms.TemporaryTopic;
   import javax.jms.TemporaryQueue;
  +import java.rmi.Remote;
  +import java.rmi.RemoteException;
   import org.spydermq.SpyMessage;
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
  -import java.rmi.Remote;
  -import java.rmi.RemoteException;
  +import org.spydermq.SpyAcknowledgementItem;
   
   /**
    *   The RMI interface of the DistributedJMSServer object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote
   {  
  +             
        // Public --------------------------------------------------------
  -
        public String getID() throws JMSException, RemoteException;
  -    public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException;
  -    public void subscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException;
  -     public void unsubscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException;
  +     public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException;
        public Topic createTopic(String dest) throws JMSException, RemoteException;
        public Queue createQueue(String dest) throws JMSException, RemoteException;
  -     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException;
  -     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException;
  -     public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException;
        public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException, RemoteException;
        public void checkID(String ID) throws JMSException, RemoteException;
  -     public SpyMessage queueReceiveNoWait(Queue queue) throws Exception, 
RemoteException;
  +     public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection 
newSpyDistributedConnection) throws RemoteException;      
  +     public void acknowledge(SpyAcknowledgementItem[] items, boolean 
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException;        
  +     public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException;        
        public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws Exception, RemoteException;
  -
  +     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException;      
  +     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException;      
  +     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws Exception, RemoteException;   
  +     public void subscribe(Destination dest, SpyDistributedConnection dc) throws 
JMSException, RemoteException;      
  +     public void unsubscribe(Destination dest, SpyDistributedConnection dc) throws 
JMSException, RemoteException;
  +     
   }
  
  
  
  1.4       +49 -40    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java
  
  Index: DistributedJMSServerRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- DistributedJMSServerRMIImpl.java  2000/06/19 04:23:15     1.3
  +++ DistributedJMSServerRMIImpl.java  2000/11/19 20:00:01     1.4
  @@ -12,22 +12,23 @@
   import javax.jms.Queue;
   import javax.jms.TemporaryTopic;
   import javax.jms.TemporaryQueue;
  +import java.rmi.server.UnicastRemoteObject;
  +import java.rmi.RemoteException;
   import org.spydermq.SpyMessage;
   import org.spydermq.SpyDestination;
   import org.spydermq.JMSServer;
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.Log;
  -import java.rmi.server.UnicastRemoteObject;
  -import java.rmi.RemoteException;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  +import org.spydermq.SpyAcknowledgementItem;
   
   /**
    *   The RMI implementation of the DistributedJMSServer object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public class DistributedJMSServerRMIImpl 
        extends UnicastRemoteObject 
  @@ -46,27 +47,16 @@
        }
   
        // Public --------------------------------------------------------
  -
        public String getID() throws JMSException
        {
                return server.getID();
        }
        
  -    public void newMessage(SpyMessage val[],String id) throws JMSException
  +     public void newMessage(SpyMessage val[],String id) throws JMSException
        {
                server.newMessage(val,id);
        }
        
  -    public void subscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException
  -     {
  -             server.subscribe(dest,who);
  -     }
  -     
  -     public void unsubscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException
  -     {
  -             server.unsubscribe(dest,who);
  -     }
  -     
        public Topic createTopic(String dest) throws JMSException
        {
                return server.createTopic(dest);
  @@ -76,21 +66,6 @@
        {
                return server.createQueue(dest);
        }
  -     
  -     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException
  -     {
  -             return server.getTemporaryTopic(dc);
  -     }
  -     
  -     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException
  -     {
  -             return server.getTemporaryQueue(dc);
  -     }
  -     
  -     public void connectionClosing(SpyDistributedConnection dc) throws JMSException
  -     {
  -             server.connectionClosing(dc,null);
  -     }
   
        public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
        {
  @@ -101,16 +76,6 @@
        {
                server.checkID(ID);
        }
  -     
  -     public SpyMessage queueReceiveNoWait(Queue queue) throws JMSException
  -     {
  -             return server.queueReceiveNoWait(queue);
  -     }
  -     
  -     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws JMSException
  -     {
  -             server.connectionListening(mode,dest,dc);
  -     }
   
        // --
        
  @@ -122,6 +87,50 @@
        public void setServer(JMSServer s)
        {
                server=s;
  +     }
  +     
  +     public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection 
newSpyDistributedConnection) {
  +             // We cannot try to cache the dc since different dc's are going
  +             // to access the same remote object via RMI
  +     }
  +     
  +     public void acknowledge(SpyAcknowledgementItem[] items, boolean 
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException{
  +             server.acknowledge(items, isAck, dc);
  +     }       
  +     
  +     public void connectionClosing(SpyDistributedConnection dc) throws JMSException
  +     {
  +             server.connectionClosing(dc,null);
  +     }       
  +     
  +     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws JMSException
  +     {
  +             server.connectionListening(mode,dest,dc);
  +     }       
  +     
  +     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException
  +     {
  +             return server.getTemporaryQueue(dc);
  +     }       
  +     
  +     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException
  +     {
  +             return server.getTemporaryTopic(dc);
  +     }       
  +     
  +     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws JMSException
  +     {
  +             return server.queueReceive(queue, wait, dc);
  +     }       
  +     
  +     public void subscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException
  +     {
  +             server.subscribe(dest,dc);
  +     }       
  +     
  +     public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException
  +     {
  +             server.unsubscribe(dest,dc);
        }
        
   }
  
  
  
  1.2       +81 -127   
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java
  
  Index: ConnectionReceiverUIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ConnectionReceiverUIL.java        2000/11/16 22:46:18     1.1
  +++ ConnectionReceiverUIL.java        2000/11/19 20:00:01     1.2
  @@ -9,10 +9,10 @@
   import javax.jms.Destination;
   import javax.jms.JMSException;
   import org.spydermq.SpyConnection;
  -import org.spydermq.ConnectionQueue;
  +
   import org.spydermq.SpyMessage;
  -import org.spydermq.SpySession;
  -import org.spydermq.SessionQueue;
  +
  +
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyTopicConnection;
   import org.spydermq.SpyQueueSession;
  @@ -35,13 +35,13 @@
   import java.io.ObjectOutputStream;
   import java.io.IOException;
   
  -/**
  +import org.spydermq.SpyMessageConsumer;/**
    *   The OIL implementation of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public class ConnectionReceiverUIL 
        implements Runnable, ConnectionReceiverSetup
  @@ -71,7 +71,9 @@
   
        void exportObject()     
        {                       
  -              new Thread(this).start();
  +              Thread thread = new Thread(this, "ConnectionReceiverUIL");
  +              thread.setDaemon(true);
  +              thread.start();
        }
        
        public void run()
  @@ -94,14 +96,17 @@
                        return;
                }
   
  -             while (true) {
  +             while (!closed) {
   
                        try {
  -                             code=in.read();
  +                             code=in.readByte();
                        } catch (IOException e) {
  +                             if( closed )
  +                                     break;
  +                                     
                                failure("Command read",e);
                                e.printStackTrace();
  -                             return;
  +                             break;
                        }
                
                        try {
  @@ -130,32 +135,48 @@
                                //Everthing was OK
                                
                                try {
  -                                     out.write(0);
  +                                     out.writeByte(0);
                                        out.flush();
                                } catch (IOException e) {
  +                                     if( closed )
  +                                             break;;
  +                                             
                                        failure("Result write",e);
  -                                     return;                                 
  +                                     break;
                                }
                                
  -                             } catch (Exception e) {
  +                     } catch (Exception e) {
  +                             
  +                             if( closed )
  +                                     break;
   
                                try {
                                        if( e instanceof NoReceiverException ) {
  -                                             out.write(2);
  +                                             out.writeByte(2);
                                                out.writeObject(e.getMessage());
                                        } else {
  -                                             out.write(1);
  +                                             out.writeByte(1);
                                                out.writeObject(e);
                                        }
                                        out.flush();
                                } catch (IOException e2) {
                                        failure("Result write",e2);
  -                                     return;                                 
  +                                     break;                                  
                                }
                                
                        }
   
                }
  +             
  +             try {
  +                     Log.log("Closing receiver connections.");
  +                     out.close();
  +                     in.close();
  +             } catch ( IOException e ) {
  +                     Log.log("Error whle closing receiver connections ");
  +                     Log.log(e);
  +                     return;                                 
  +             }
        }
        
        void failure(String st,Exception e)
  @@ -179,143 +200,76 @@
                return new ConnectionReceiverUILClient();
        }
        
  -     // ---
  -     
  -     //<DEBUG>
  -     
  -     /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  -     {
  -             connection.rec++;
  -     }
  -     
  -     public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
JMSException
  -     {
  -             connection.rec++;
  -     }*/
  -     
  -     //</DEBUG>
  -     
  -     
        //A message has arrived for this Connection, We have to dispatch it to the 
sessions
  -     public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
  -     {               
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             
  -             Log.log("ConnectionReceiver: 
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
  -             
  +     public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +
  +             Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() + 
",Mes=" + mes.toString() + ")");
  +
                if (connection instanceof SpyTopicConnection) {
  -             
  +
                        //Get the set of subscribers for this Topic
  -             
  -                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  -                     if (connectionQueue==null) return;
  -     
  -                     Iterator i=connectionQueue.subscribers.iterator();
  -                                     
  -                     while (i.hasNext()) {                                          
                 
  -                                     
  -                             SpySession session=(SpySession)i.next();
  -                                     
  -                             //add the new message to the session's queue
  -                             session.dispatchMessage(dest,mes);
  -                                     
  +                     SpyMessageConsumer consumers[] = connection.getConsumers(dest);
  +
  +                     for (int i = 0; i < consumers.length; i++) {
  +
  +                             //add the new message to the consumer's queue
  +                             consumers[i].addMessage(mes);
  +
                                //There is work to do... 
  -                             session.mutex.notifyLock();
  +                             consumers[i].session.mutex.notifyLock();
                        }
  +
                } else {
  -                     
  -                     while (true) {
  -                             
  -                             SessionQueue sq=null;
   
  -                             try {
  -                     
  -                                     //Find one session waiting for this Queue
  -                                     if (connection.modeStop) throw new 
Exception("This connection is stopped !");
  -                                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
  -                                     if (connectionQueue==null) throw new 
Exception("There is no connectionQueue for this destination !");
  -                                     
  -                                     synchronized (connectionQueue) {
  -                                             
  -                                             //Find a SessionQueue
  -                                             if 
(connectionQueue.NumListeningSessions==0) throw new NoReceiverException("There are no 
listening sessions for this destination !");
  -                     
  -                                             Iterator 
i=connectionQueue.subscribers.iterator();
  -                                             while (i.hasNext()) {
  -                                                     SpySession 
session=(SpySession)i.next();
  -                                                     
sq=(SessionQueue)session.destinations.get(dest);
  -                                                     if 
(sq.NumListeningSubscribers!=0) break;
  -                                             }
  -                                             if 
(sq==null||sq.NumListeningSubscribers==0) {
  -                                                     Log.error("FIXME: The 
listeners count was invalid !");
  -                                                     throw new 
NoReceiverException("There are no listening sessions for this destination !");
  -                                             }
  -                                     
  -                                             //Try with this sessionQueue
  -                                             Log.log("Dispatching to SessionQueue: 
"+mes);
  -                                             sq.dispatchMessage(dest,mes);
  -                             
  -                                             //Our work is done here
  -                                             break;
  -                                     }
  +                     //Find one session waiting for this Queue
  +                     if (connection.modeStop)
  +                             throw new JMSException("This connection is stopped !");
  +
  +                     SpyMessageConsumer consumer = 
connection.pickListeningConsumer(dest);
  +                     if (consumer == null)
  +                             throw new NoReceiverException("There are no listening 
sessions for this destination !");
  +
  +                     //Try with this sessionQueue
  +                     Log.log("Dispatching to SessionQueue: " + mes);
  +                     ((org.spydermq.SpyQueueReceiver) 
consumer).dispatchMessage(mes);
   
  -                             } catch (NoReceiverException e) {
  -                                     //This SessionQueue should not have been 
registered !
  -                                     throw e;
  -                             } catch (Exception e) {
  -                                     //This error is non-recoverable : we must 
unregister from this queue
  -                                     //Let the JMSServerQueue do its work
  -                                     Log.log(e);
  -                                     throw new JMSException("There are no listening 
sessions in this connection");
  -                             }
  -                     }
  -                     
                }
  -                     
        } 
   
  -     public void receiveMultiple(SpyDestination dest,int nb,ObjectInputStream in) 
throws Exception
  -     {               
  -             if (closed) throw new IllegalStateException("The connection is 
closed");
  -             
  +     public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in) 
throws Exception {
  +             if (closed)
  +                     throw new IllegalStateException("The connection is closed");
  +
                Log.log("ConnectionReceiver: ReceiveMultiple()");
  -             
  +
                if (connection instanceof SpyTopicConnection) {
  -             
  +
                        //Get the set of subscribers for this Topic
  -             
  -                     ConnectionQueue 
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);             
  -                     if (connectionQueue==null) return;
  -     
  -                     for(int val=0;val<nb;val++) {
  -                     
  -                             SpyMessage mes=(SpyMessage)in.readObject();
  -                             
  -                             //NL: i is a short-lived object. Try to "group" 
messages in an pre-allocated/peristant
  -                             //array and apply the same iterator on this array
  -                             Iterator i=connectionQueue.subscribers.iterator();
  -                                     
  -                             while (i.hasNext()) {                                  
                         
  -                             
  -                                     SpySession session=(SpySession)i.next();
  -                                     
  -                                     //add the new message to the session's queue
  -                                     session.dispatchMessage(dest,mes);
  -                             
  +                     SpyMessageConsumer consumers[] = connection.getConsumers(dest);
  +
  +                     for (int val = 0; val < nb; val++) {
  +                             SpyMessage mes = (SpyMessage) in.readObject();
  +
  +                             for (int i = 0; i < consumers.length; i++) {
  +
  +                                     //add the new message to the consumer's queue
  +                                     consumers[i].addMessage(mes);
  +
                                        //There is work to do... 
  -                                     session.mutex.notifyLock();
  +                                     consumers[i].session.mutex.notifyLock();
                                }
                        }
  -                     
                } else {
  -                     throw new Exception("Multiple dispatch for a Queue");          
         
  +                     throw new Exception("Multiple dispatch for a Queue");
                }
        } 
        
        
        public void close() throws Exception
        {
  -             closed=true;            
  +             closed=true;
        }
   
        //One TemporaryDestination has been deleted
  
  
  
  1.2       +11 -15    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java
  
  Index: ConnectionReceiverUILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- ConnectionReceiverUILClient.java  2000/11/16 22:46:19     1.1
  +++ ConnectionReceiverUILClient.java  2000/11/19 20:00:02     1.2
  @@ -30,7 +30,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   import org.spydermq.multiplexor.SocketMultiplexor;public class 
ConnectionReceiverUILClient
        implements ConnectionReceiver, Serializable
  @@ -40,17 +40,11 @@
        static final int DELETE_TEMPORARY_DESTINATION = 3;
        static final int CLOSE = 4;             
        
  -
  -
  -
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
  -     
  -
  -
  +     transient SocketMultiplexor mSocket;
        
  -
  -     
  +             
        void createConnection() throws RemoteException
        {
                try {
  @@ -68,7 +62,7 @@
                Exception throwException=null;
                try {
                        out.flush();
  -                     int val=in.read();
  +                     int val=in.readByte();
                        switch(val) {
                        case 1:                         
                                Exception e=(Exception)in.readObject();
  @@ -91,7 +85,7 @@
        public void receive(SpyDestination dest,SpyMessage mes) throws Exception
        {               
                if (out==null) createConnection();
  -             out.write(RECEIVE);
  +             out.writeByte(RECEIVE);
                out.writeObject(dest);
                out.writeObject(mes);
                waitAnswer();
  @@ -100,7 +94,7 @@
        public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
Exception      
        {
                if (out==null) createConnection();
  -             out.write(RECEIVE_MULTIPLE);
  +             out.writeByte(RECEIVE_MULTIPLE);
                out.writeObject(dest);
                out.writeInt(mes.length);
                for(int i=0;i<mes.length;i++)
  @@ -111,7 +105,7 @@
        public void deleteTemporaryDestination(SpyDestination dest) throws Exception
        {
                if (out==null) createConnection();
  -             out.write(DELETE_TEMPORARY_DESTINATION);
  +             out.writeByte(DELETE_TEMPORARY_DESTINATION);
                out.writeObject(dest);
                waitAnswer();
        }       
  @@ -119,10 +113,12 @@
        public void close() throws Exception
        {
                if (out==null) createConnection();
  -             out.write(CLOSE);
  +             out.writeByte(CLOSE);
                waitAnswer();
        }
        
  -     transient SocketMultiplexor mSocket;    public ConnectionReceiverUILClient()
  +     public ConnectionReceiverUILClient()
        {
  -     }}
  +     }
  +     
  +}
  
  
  
  1.2       +58 -41    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java
  
  Index: DistributedJMSServerUIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- DistributedJMSServerUIL.java      2000/11/16 22:46:19     1.1
  +++ DistributedJMSServerUIL.java      2000/11/19 20:00:02     1.2
  @@ -36,9 +36,9 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
  -import org.spydermq.multiplexor.SocketMultiplexor;public class 
DistributedJMSServerUIL
  +import org.spydermq.multiplexor.SocketMultiplexor;import 
org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerUIL
        implements Runnable, DistributedJMSServerSetup, DistributedJMSServerUILMBean
   {
   
  @@ -55,7 +55,7 @@
        }
   
        // Internals -----------------------------------------------------
  -     
  +
        static final int GetID = 1;
        static final int NewMessage = 2;
        static final int Subscribe = 3;
  @@ -67,8 +67,10 @@
        static final int ConnectionClosing = 9;
        static final int DeleteTemporaryDestination = 10;
        static final int CheckID = 11;
  -     static final int QueueReceiveNoWait = 12;
  +     static final int QueueReceive = 12;     
        static final int ConnectionListening = 13;
  +     static final int Acknowledge = 14;      
  +     static final int SetSpyDistributedConnection = 15;
        
        private ServerSocket serverSocket;
   
  @@ -89,6 +91,8 @@
                int code = 0;
                ObjectOutputStream out=null;
                ObjectInputStream in=null;
  +             SpyDistributedConnection spyDistributedConnection=null;
  +             boolean closed = false;
                      
                try {
                        socket = serverSocket.accept();
  @@ -111,14 +115,14 @@
                }
   
                
  -             while (true) {
  +             while (!closed) {
   
                        try {
  -                             code=in.read();         
  +                             code=in.readByte();             
                        } catch (IOException e) {
                                Log.notice("Command read");
                                Log.notice(e);
  -                             return;
  +                             break;
                        }
                
                        try {
  @@ -134,14 +138,10 @@
                                                
newMessage((String)in.readObject(),in.readInt(),in);
                                                break;
                                        case Subscribe:
  -                                             Destination d = 
(Destination)in.readObject();
  -                                             SpyDistributedConnection c = 
(SpyDistributedConnection)in.readObject();
  -                                             if( c.cr instanceof 
ConnectionReceiverUILClient ) 
  -                                                     
((ConnectionReceiverUILClient)c.cr).mSocket = mSocket;
  -                                             server.subscribe(d,c);
  +                                             
server.subscribe((Destination)in.readObject(),spyDistributedConnection);
                                                break;
                                        case Unsubscribe: 
  -                                             
server.unsubscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
  +                                             
server.unsubscribe((Destination)in.readObject(),spyDistributedConnection);
                                                break;
                                        case CreateTopic: 
                                                
result=(Topic)server.createTopic((String)in.readObject());
  @@ -150,13 +150,14 @@
                                                
result=(Queue)server.createQueue((String)in.readObject());
                                                break;
                                        case GetTemporaryTopic:
  -                                             
result=(TemporaryTopic)server.getTemporaryTopic((SpyDistributedConnection)in.readObject());
  +                                             
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
                                                break;
                                        case GetTemporaryQueue: 
  -                                             
result=(TemporaryQueue)server.getTemporaryQueue((SpyDistributedConnection)in.readObject());
  +                                             
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
                                                break;
                                        case ConnectionClosing: 
  -                                             
server.connectionClosing((SpyDistributedConnection)in.readObject(),null);
  +                                             
server.connectionClosing(spyDistributedConnection,null);
  +                                             closed = true;
                                                break;
                                        case DeleteTemporaryDestination: 
                                                
server.deleteTemporaryDestination((SpyDestination)in.readObject());
  @@ -164,12 +165,25 @@
                                        case CheckID: 
                                                
server.checkID((String)in.readObject());
                                                break;
  -                                     case QueueReceiveNoWait:
  -                                             
result=server.queueReceiveNoWait((Queue)in.readObject());
  +                                     case QueueReceive:
  +                                             
result=server.queueReceive((Queue)in.readObject(), 
in.readLong(),spyDistributedConnection);
                                                break;
                                        case ConnectionListening: 
  -                                             
server.connectionListening(((in.read())==1),(Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
  +                                             
server.connectionListening(in.readBoolean(),(Destination)in.readObject(),spyDistributedConnection);
                                                break;
  +                                     case Acknowledge:
  +                                             SpyAcknowledgementItem items[] = new 
SpyAcknowledgementItem[in.readInt()];
  +                                             for( int i=0; i < items.length; i++ )
  +                                                     items[i] = 
(SpyAcknowledgementItem)in.readObject();
  +                                             server.acknowledge(items, 
in.readBoolean(),spyDistributedConnection);
  +                                             break;
  +                                     case SetSpyDistributedConnection: 
  +                                             spyDistributedConnection = 
(SpyDistributedConnection)in.readObject();
  +                                             if( spyDistributedConnection.cr 
instanceof ConnectionReceiverUILClient ) {
  +                                                     
((ConnectionReceiverUILClient)spyDistributedConnection.cr).mSocket = mSocket;
  +                                                     
((ConnectionReceiverUILClient)spyDistributedConnection.cr).createConnection();
  +                                             }
  +                                             break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
  @@ -177,29 +191,45 @@
                                //Everthing was OK
                                
                                try {
  -                                     if (result==null) out.write(0);
  +                                     if (result==null) 
  +                                             out.writeByte(0);
                                        else {
  -                                             out.write(1);
  +                                             out.writeByte(1);
                                                out.writeObject(result);
                                        }
                                        out.flush();
                                } catch (IOException e) {
  +                                     if( closed )
  +                                             break;
                                        failure("Result write",e);
  -                                     return;                                 
  +                                     break;                                  
                                }
                                
                        } catch (Exception e) {
  +                             if( closed )
  +                                     break;
   
                                try {
  -                                     out.write(2);
  +                                     out.writeByte(2);
                                        out.writeObject(e);
                                        out.flush();
                                } catch (IOException e2) {
                                        failure("Result write",e2);
  -                                     return;                                 
  +                                     break;                                  
                                }                               
                        }
                }
  +
  +             try {
  +
  +                     if( !closed )
  +                             
server.connectionClosing(spyDistributedConnection,null);
  +
  +                     mSocket.close();
  +             } catch ( IOException e ) {
  +                     Log.log("Could not gracefully close the connection to the 
server.");
  +                     Log.log(e);
  +             }
        }
        
        void failure(String st,Exception e)
  @@ -210,27 +240,14 @@
        
        void newMessage(String id,int nb,ObjectInputStream in) throws Exception
        {
  -             Log.notice("INCOMING: "+nb+" messages from "+id);
  -             
  -             SpyDestination dest=null;
  -             JMSServerQueue queue=null;
  -             
  -             for(int i=0;i<nb;i++) { 
  -                     
  -                     SpyMessage mes=(SpyMessage)in.readObject();
  -                     
  -                     if (dest==null||!dest.equals(mes.jmsDestination)) {
  -                             
queue=(JMSServerQueue)server.messageQueue.get(mes.jmsDestination);
  -                             if (queue==null) throw new JMSException("This 
destination does not exist !"); //hum...
  -                     }
  +             SpyMessage mes[] = new SpyMessage[nb];
  +             for(int i=0;i<nb;i++) 
  +                     mes[i]=(SpyMessage)in.readObject();                     
                
  -                     //Add the message to the queue          
  -                     queue.addMessage(mes);
  -             }
  +             server.newMessage(mes, id);
        }
  -     
  -     // --
        
  +     // --   
        public DistributedJMSServer createClient() throws Exception
        {               
                return new 
DistributedJMSServerUILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
  
  
  
  1.2       +93 -58    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java
  
  Index: DistributedJMSServerUILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- DistributedJMSServerUILClient.java        2000/11/16 22:46:20     1.1
  +++ DistributedJMSServerUILClient.java        2000/11/19 20:00:02     1.2
  @@ -34,9 +34,9 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
  -import org.spydermq.multiplexor.SocketMultiplexor;public class 
DistributedJMSServerUILClient
  +import org.spydermq.multiplexor.SocketMultiplexor;import 
org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerUILClient
        implements DistributedJMSServer, Serializable
   {
        
  @@ -51,19 +51,22 @@
        static final int ConnectionClouting = 9;
        static final int DeleteTemporaryDestination = 10;
        static final int CheckID = 11;
  -     static final int QueueReceiveNoWait = 12;
  +     static final int QueueReceive = 12;             
        static final int ConnectionListening = 13;
  +     static final int Acknowledge = 14;                      
  +     static final int SetSpyDistributedConnection = 15;      
        
  +             
        //Remote stuff
  +     private int port;
  +     private InetAddress addr;
        
        private transient Socket socket;
  +     transient SocketMultiplexor mSocket;    
   
  -
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
        
  -     private int port;
  -     private InetAddress addr;
        
        public DistributedJMSServerUILClient(InetAddress addr,int port)
        {
  @@ -95,7 +98,7 @@
        {
                try {
                        out.flush();
  -                     int val=in.read();
  +                     int val=in.readByte();
                        if (val==0) return null;
                        if (val==1) {
                                return in.readObject();
  @@ -126,7 +129,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     out.write(NewMessage);
  +                     out.writeByte(NewMessage);
                        out.writeObject(id);
                        out.writeInt(val.length);
                        for(int i=0;i<val.length;i++)
  @@ -143,7 +146,7 @@
                if (socket==null) createConnection();
                
                try {
  -                     out.write(GetID);
  +                     out.writeByte(GetID);
                } catch (IOException e) {
                        failure(e);
                }
  @@ -151,162 +154,194 @@
                return (String)waitAnswer();
        }
                
  -     public void subscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException
  +
  +     
  +
  +     
  +     public Topic createTopic(String dest) throws JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(Subscribe);
  +                     out.writeByte(CreateTopic);
                        out.writeObject(dest);
  -                     out.writeObject(who);
                } catch (IOException e) {
                        failure(e);
                }
                
  -             waitAnswer();
  +             return (Topic)waitAnswer();
        }
        
  -     public void unsubscribe(Destination dest,SpyDistributedConnection who) throws 
JMSException, RemoteException
  +     public Queue createQueue(String dest) throws JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(Unsubscribe);
  +                     out.writeByte(CreateQueue);
                        out.writeObject(dest);
  -                     out.writeObject(who);
                } catch (IOException e) {
                        failure(e);
                }
                
  -             waitAnswer();
  +             return (Queue)waitAnswer();
        }
        
  -     public Topic createTopic(String dest) throws JMSException, RemoteException
  +
  +     
  +
  +     
  +
  +     
  +     public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(CreateTopic);
  +                     out.writeByte(DeleteTemporaryDestination);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
                
  -             return (Topic)waitAnswer();
  +             waitAnswer();
        }
        
  -     public Queue createQueue(String dest) throws JMSException, RemoteException
  +     public void checkID(String ID) throws JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(CreateQueue);
  -                     out.writeObject(dest);
  +                     out.writeByte(CheckID);
  +                     out.writeObject(ID);
                } catch (IOException e) {
                        failure(e);
                }
                
  -             return (Queue)waitAnswer();
  +             waitAnswer();
        }
        
  -     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  +     public void setSpyDistributedConnection(SpyDistributedConnection dest) throws 
RemoteException {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(GetTemporaryTopic);
  -                     out.writeObject(dc);
  +                     out.writeByte( SetSpyDistributedConnection );
  +                     out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  +             }               
  +             waitAnswer();
  +     }       
  +     
  +     public void acknowledge(SpyAcknowledgementItem item[],boolean 
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException {
  +             if (socket==null) createConnection();
  +             
  +             try {
  +                     out.writeByte(Acknowledge);
  +                     out.writeInt(item.length);
  +                     for( int i=0; i< item.length; i++ )
  +                             out.writeObject(item[i]);
  +                     out.writeBoolean(isAck);
  +             } catch (IOException e) {
  +                     failure(e);
                }
                
  -             return (TemporaryTopic)waitAnswer();            
  -     }
  +             waitAnswer();
  +     }       
        
  -     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException
  +     public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(GetTemporaryQueue);
  -                     out.writeObject(dc);
  +                     out.writeByte(ConnectionClouting);
                } catch (IOException e) {
                        failure(e);
                }
  -             
  -             return (TemporaryQueue)waitAnswer();            
  -     }
  -     
  -
  +             waitAnswer();
  +     }       
        
  -     public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException, RemoteException
  +     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws Exception, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(DeleteTemporaryDestination);
  +                     out.writeByte(ConnectionListening);
  +                     out.writeBoolean(mode);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
  -     }
  +     }       
        
  -     public void checkID(String ID) throws JMSException, RemoteException
  +     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(CheckID);
  -                     out.writeObject(ID);
  +                     out.writeByte(GetTemporaryQueue);
                } catch (IOException e) {
                        failure(e);
                }
                
  -             waitAnswer();
  -     }
  +             return (TemporaryQueue)waitAnswer();            
  +     }       
        
  -     public SpyMessage queueReceiveNoWait(Queue queue) throws Exception, 
RemoteException
  +     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(QueueReceiveNoWait);
  +                     out.writeByte(GetTemporaryTopic);
  +             } catch (IOException e) {
  +                     failure(e);
  +             }
  +             
  +             return (TemporaryTopic)waitAnswer();            
  +     }       
  +     
  +     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws Exception, RemoteException
  +     {
  +             if (socket==null) createConnection();
  +             
  +             try {
  +                     out.writeByte(QueueReceive);
                        out.writeObject(queue);
  +                     out.writeLong(wait);
                } catch (IOException e) {
                        failure(e);
                }
                
                return (SpyMessage)waitAnswer();
  -     }
  +     }       
        
  -     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws Exception, RemoteException
  +     public void subscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(ConnectionListening);
  -                     if (mode) out.write(1);
  -                     else out.write(0);                                             
  
  +                     out.writeByte(Subscribe);
                        out.writeObject(dest);
  -                     out.writeObject(dc);
                } catch (IOException e) {
                        failure(e);
                }
                
                waitAnswer();
  -     }
  +     }       
        
  -     static final int ConnectionOneTimeListener = 14;                transient 
SocketMultiplexor mSocket;    public void connectionClosing(SpyDistributedConnection 
dc) throws JMSException, RemoteException
  +     public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException, RemoteException
        {
                if (socket==null) createConnection();
                
                try {
  -                     out.write(ConnectionClouting);
  -                     out.writeObject(dc);
  +                     out.writeByte(Unsubscribe);
  +                     out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
  +             
                waitAnswer();
  -     }}
  +     }
  +     
  +}
  
  
  
  1.2       +0 -0      
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILMBean.java
  
  Index: DistributedJMSServerUILMBean.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILMBean.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- DistributedJMSServerUILMBean.java 2000/11/16 22:46:20     1.1
  +++ DistributedJMSServerUILMBean.java 2000/11/19 20:00:02     1.2
  @@ -14,7 +14,7 @@
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.1 $
  + *   @version $Revision: 1.2 $
    */
   public interface DistributedJMSServerUILMBean
   {
  
  
  

Reply via email to