User: hiram   
  Date: 00/12/11 21:58:54

  Modified:    src/java/org/spydermq/distributed/server
                        DistributedJMSServerRMIImpl.java
                        ConnectionReceiverRMIImpl.java
                        DistributedJMSServerOILClient.java
                        ConnectionReceiverOILClient.java
                        ConnectionReceiverUILClient.java
                        DistributedJMSServerRMIImplMBean.java
                        DistributedJMSServerUIL.java
                        DistributedJMSServerRMI.java
                        ConnectionReceiverOIL.java
                        DistributedJMSServerUILClient.java
                        DistributedJMSServerOIL.java
                        ConnectionReceiverRMI.java
  Added:       src/java/org/spydermq/distributed/server
                        DistributedConnectionFactoryRMIImplMBean.java
                        DistributedConnectionFactoryRMI.java
                        DistributedConnectionFactoryRMIImpl.java
  Removed:     src/java/org/spydermq/distributed/server
                        DistributedQueueConnectionFactoryRMI.java
                        DistributedTopicConnectionFactoryRMI.java
                        DistributedTopicConnectionFactoryRMIImplMBean.java
                        DistributedQueueConnectionFactoryRMIImpl.java
                        DistributedQueueConnectionFactoryRMIImplMBean.java
                        DistributedTopicConnectionFactoryRMIImpl.java
  Log:
  Several Chanages:
  - Invocation layer simplified by joing the DistributedQueueConnectionFactory and 
DistributedTopicConnectionFactory into DistributedConnectionFactory
  - Seperated server code from client code ( server code moved to org.spydermq.server )
  - All publish() calls are now sync to the provider.
  - Added a Transaction class to better represent a commit/rollback request to the 
server.
  - Now have a InvocationLayerFactory so that we can potentialy load multiple 
invocation layers (OIL/UIL/RMI) at the same time.
  
  Revision  Changes    Path
  1.5       +22 -26    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java
  
  Index: DistributedJMSServerRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- DistributedJMSServerRMIImpl.java  2000/11/19 20:00:01     1.4
  +++ DistributedJMSServerRMIImpl.java  2000/12/12 05:58:50     1.5
  @@ -16,21 +16,15 @@
   import java.rmi.RemoteException;
   import org.spydermq.SpyMessage;
   import org.spydermq.SpyDestination;
  -import org.spydermq.JMSServer;
  +
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.Log;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
   import org.spydermq.SpyAcknowledgementItem;
   
  -/**
  - *   The RMI implementation of the DistributedJMSServer object
  - *      
  - *   @author Norbert Lataille ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.4 $
  - */
  -public class DistributedJMSServerRMIImpl 
  +import org.spydermq.server.JMSServer;
  +import org.spydermq.Transaction;public class DistributedJMSServerRMIImpl 
        extends UnicastRemoteObject 
        implements DistributedJMSServerRMI, DistributedJMSServerRMIImplMBean, 
DistributedJMSServerSetup
   { 
  @@ -52,22 +46,22 @@
                return server.getID();
        }
        
  -     public void newMessage(SpyMessage val[],String id) throws JMSException
  +     public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws 
JMSException
        {
  -             server.newMessage(val,id);
  +             server.addMessage(dc, val);
        }
        
  -     public Topic createTopic(String dest) throws JMSException
  +     public Topic createTopic(SpyDistributedConnection dc, String dest) throws 
JMSException
        {
                return server.createTopic(dest);
        }
        
  -     public Queue createQueue(String dest) throws JMSException
  +     public Queue createQueue(SpyDistributedConnection dc, String dest) throws 
JMSException
        {
                return server.createQueue(dest);
        }
   
  -     public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
  +     public void deleteTemporaryDestination(SpyDistributedConnection dc, 
SpyDestination dest) throws JMSException
        {
                server.deleteTemporaryDestination(dest);
        }
  @@ -89,13 +83,13 @@
                server=s;
        }
        
  -     public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection 
newSpyDistributedConnection) {
  +     public void setSpyDistributedConnection(SpyDistributedConnection 
newSpyDistributedConnection) {
                // We cannot try to cache the dc since different dc's are going
                // to access the same remote object via RMI
        }
        
  -     public void acknowledge(SpyAcknowledgementItem[] items, boolean 
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException{
  -             server.acknowledge(items, isAck, dc);
  +     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws JMSException, RemoteException{
  +             server.acknowledge(dc, item);
        }       
        
        public void connectionClosing(SpyDistributedConnection dc) throws JMSException
  @@ -103,9 +97,9 @@
                server.connectionClosing(dc,null);
        }       
        
  -     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws JMSException
  +     public void connectionListening(SpyDistributedConnection dc,boolean 
mode,Destination dest) throws JMSException
        {
  -             server.connectionListening(mode,dest,dc);
  +             server.connectionListening(dc,mode,dest);
        }       
        
        public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException
  @@ -118,19 +112,21 @@
                return server.getTemporaryTopic(dc);
        }       
        
  -     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws JMSException
  +     public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long 
wait) throws JMSException
        {
  -             return server.queueReceive(queue, wait, dc);
  +             return server.queueReceive(dc,queue, wait);
        }       
        
  -     public void subscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException
  +     public void subscribe(SpyDistributedConnection dc, Destination dest) throws 
JMSException
        {
  -             server.subscribe(dest,dc);
  +             server.subscribe(dc,dest);
        }       
        
  -     public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException
  +     public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws 
JMSException
        {
  -             server.unsubscribe(dest,dc);
  +             server.unsubscribe(dc,dest);
        }
        
  -}
  +     public void transact(org.spydermq.SpyDistributedConnection dc, Transaction t) 
throws JMSException {
  +             server.transact(dc,t);
  +     }}
  
  
  
  1.13      +8 -6      
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
  
  Index: ConnectionReceiverRMIImpl.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- ConnectionReceiverRMIImpl.java    2000/11/19 20:00:01     1.12
  +++ ConnectionReceiverRMIImpl.java    2000/12/12 05:58:50     1.13
  @@ -8,30 +8,32 @@
   
   import javax.jms.Destination;
   import javax.jms.JMSException;
  -import org.spydermq.SpyConnection;
   
  +import org.spydermq.SpyConnection;
   import org.spydermq.SpyMessage;
   import org.spydermq.SpySession;
  -
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyTopicConnection;
   import org.spydermq.SpyQueueSession;
   import org.spydermq.Log;
   import org.spydermq.NoReceiverException;
  +import org.spydermq.SpyMessageConsumer;
  +import org.spydermq.distributed.interfaces.ConnectionReceiver;
  +import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
  +
   import java.rmi.RemoteException; 
   import java.rmi.server.UnicastRemoteObject;
   import java.util.Hashtable;
   import java.util.HashSet;
   import java.util.Iterator;
  -import org.spydermq.distributed.interfaces.ConnectionReceiver;
  -import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
   
  -import org.spydermq.SpyMessageConsumer;/**
  +/**
    *   The RMI implementation of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.12 $
  + *   @version $Revision: 1.13 $
    */
   public class ConnectionReceiverRMIImpl 
        extends UnicastRemoteObject 
  
  
  
  1.5       +27 -288   
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
  
  Index: DistributedJMSServerOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- DistributedJMSServerOILClient.java        2000/11/29 17:06:36     1.4
  +++ DistributedJMSServerOILClient.java        2000/12/12 05:58:50     1.5
  @@ -1,3 +1,9 @@
  +/*
  + * spyderMQ, the OpenSource JMS implementation
  + *
  + * Distributable under GPL license.
  + * See terms of license at gnu.org.
  + */
   package org.spydermq.distributed.server;
   
   import javax.jms.JMSException;
  @@ -6,11 +12,15 @@
   import javax.jms.Queue;
   import javax.jms.TemporaryTopic;
   import javax.jms.TemporaryQueue;
  +
   import org.spydermq.SpyMessage;
   import org.spydermq.SpyDestination;
  -import org.spydermq.JMSServer;
   import org.spydermq.Log;
   import org.spydermq.SpyDistributedConnection;
  +import org.spydermq.distributed.interfaces.DistributedJMSServer;
  +import org.spydermq.SpyAcknowledgementItem;
  +import org.spydermq.server.JMSServer;
  +
   import java.rmi.RemoteException;
   import java.io.ObjectOutputStream;
   import java.io.ObjectInputStream;
  @@ -20,47 +30,27 @@
   import java.io.Serializable;
   import java.net.Socket;
   import java.net.InetAddress;
  -import org.spydermq.distributed.interfaces.DistributedJMSServer;
  -
  -import org.spydermq.SpyAcknowledgementItem;public class 
DistributedJMSServerOILClient
  -     implements DistributedJMSServer, Serializable
  -{
  -     
  -     static final int GetID = 1;
  -     static final int NewMessage = 2;
  -     static final int Subscribe = 3;
  -     static final int Unsubscribe = 4;
  -     static final int CreateTopic = 5;
  -     static final int CreateQueue = 6;
  -     static final int GetTemporaryTopic = 7;
  -     static final int GetTemporaryQueue = 8;
  -     static final int ConnectionClosing = 9;
  -     static final int DeleteTemporaryDestination = 10;
  -     static final int CheckID = 11;
  -     static final int QueueReceive = 12;     
  -     static final int ConnectionListening = 13;
  -     static final int Acknowledge = 14;      
  -     static final int SetSpyDistributedConnection = 15;      
  -     
  -     //Remote stuff
  -     
  -     private transient Socket socket;
  -
   
  -     private transient ObjectOutputStream out;
  -     private transient ObjectInputStream in;
  +/**
  + *The OIL implementation of the DistributedJMSServer object
  + *      
  + *@author Norbert Lataille ([EMAIL PROTECTED])
  + *@author Hiram Chirino ([EMAIL PROTECTED])
  + * 
  + *@version $Revision: 1.5 $
  + */
  +public class DistributedJMSServerOILClient   extends DistributedJMSServerUILClient
  +     implements DistributedJMSServer, Serializable {
        
  -     private int port;
  -     private InetAddress addr;
  +     Socket socket;
        
        public DistributedJMSServerOILClient(InetAddress addr,int port)
        {
  +             super(addr,port);
                socket=null;
  -             this.port=port;
  -             this.addr=addr;
        }
        
  -     void createConnection() throws RemoteException
  +     protected void createConnection() throws RemoteException
        {
                try {                   
                        socket=new Socket(addr,port);
  @@ -72,260 +62,9 @@
                }
        }
        
  -     public Object waitAnswer() throws RemoteException
  -     {
  -             try {
  -                     out.reset();
  -                     out.flush();
  -                     int val=in.readByte();
  -                     if (val==0) return null;
  -                     if (val==1) {
  -                             return in.readObject();
  -                     } else {
  -                             String st=(String)in.readObject();
  -                             throw new RemoteException(st);  
  -                     }
  -             } catch (Exception e) {
  -                     failure(e);
  -                     return null;
  -             }
  -             
  -     }       
  -     
  -     void failure(Exception e) throws RemoteException
  -     {
  -             Log.error(e);
  -             throw new RemoteException("Cannot contact the remote object");
  -     }       
  -     
  -     //--- Remote Calls
  -     
  -     public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(NewMessage);
  -                     out.writeObject(id);
  -                     out.writeInt(val.length);
  -                     for(int i=0;i<val.length;i++)
  -                             out.writeObject(val[i]);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
  +     protected void checkConnection() throws RemoteException {
  +             if (socket == null)
  +                     createConnection();
        }
   
  -     public String getID() throws Exception
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(GetID);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             return (String)waitAnswer();
  -     }
  -             
  -     public void subscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(Subscribe);
  -                     out.writeObject(dest);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
  -     }
  -     
  -     public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(Unsubscribe);
  -                     out.writeObject(dest);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
  -     }
  -     
  -     public Topic createTopic(String dest) throws JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(CreateTopic);
  -                     out.writeObject(dest);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             return (Topic)waitAnswer();
  -     }
  -     
  -     public Queue createQueue(String dest) throws JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(CreateQueue);
  -                     out.writeObject(dest);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             return (Queue)waitAnswer();
  -     }
  -     
  -     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(GetTemporaryTopic);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             return (TemporaryTopic)waitAnswer();            
  -     }
  -     
  -     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(GetTemporaryQueue);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             return (TemporaryQueue)waitAnswer();            
  -     }
  -     
  -     public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(ConnectionClosing);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
  -     }
  -     
  -     public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(DeleteTemporaryDestination);
  -                     out.writeObject(dest);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
  -     }
  -     
  -     public void checkID(String ID) throws JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(CheckID);
  -                     out.writeObject(ID);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
  -     }
  -     
  -     
  -     public void connectionListening(boolean mode,Destination dest, 
SpyDistributedConnection dc) throws Exception, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(ConnectionListening);
  -                     out.writeBoolean(mode);
  -                     out.writeObject(dest);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
  -     }
  -     
  -     public void acknowledge(SpyAcknowledgementItem item[],boolean 
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(Acknowledge);
  -                     out.writeInt(item.length);
  -                     for( int i=0; i< item.length; i++ )
  -                             out.writeObject(item[i]);
  -                     out.writeBoolean(isAck);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
  -     }       
  -     
  -     public SpyMessage queueReceive(Queue queue, long wait) throws Exception, 
RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(QueueReceive);
  -                     out.writeObject(queue);
  -                     out.writeLong(wait);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             return (SpyMessage)waitAnswer();
  -     }       
  -     
  -     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws Exception, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(QueueReceive);
  -                     out.writeObject(queue);
  -                     out.writeLong(wait);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             return (SpyMessage)waitAnswer();
  -     }
  -     
  -     
  -     public void setSpyDistributedConnection(SpyDistributedConnection dest) throws 
RemoteException {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte( SetSpyDistributedConnection );
  -                     out.writeObject(dest);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }               
  -             waitAnswer();
  -     }
  -     
   }
  
  
  
  1.12      +28 -10    
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
  
  Index: ConnectionReceiverOILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- ConnectionReceiverOILClient.java  2000/11/29 17:06:36     1.11
  +++ ConnectionReceiverOILClient.java  2000/12/12 05:58:50     1.12
  @@ -1,3 +1,9 @@
  +/*
  + * spyderMQ, the OpenSource JMS implementation
  + *
  + * Distributable under GPL license.
  + * See terms of license at gnu.org.
  + */
   package org.spydermq.distributed.server;
   
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
  @@ -5,6 +11,7 @@
   import org.spydermq.SpyMessage;
   import org.spydermq.Log;
   import org.spydermq.SpyConnection;
  +
   import java.rmi.RemoteException;
   import java.io.ObjectOutputStream;
   import java.io.ObjectInputStream;
  @@ -18,6 +25,14 @@
   import java.net.InetAddress;
   import java.net.SocketException;
   
  +/**
  + *   The UIL implementation of the ConnectionReceiver object
  + *      
  + *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
  + * 
  + *   @version $Revision: 1.12 $
  + */
   public class ConnectionReceiverOILClient
        implements ConnectionReceiver, Serializable
   {
  @@ -27,11 +42,9 @@
        static final int CLOSE = 4;             
        
        private transient Socket socket;
  -
  -
        private transient ObjectOutputStream out;
        private transient ObjectInputStream in;
  -     
  +
        private int port;
        private InetAddress addr;
        
  @@ -64,11 +77,11 @@
                        int val=in.readByte();
                        switch(val) {
                        case 1:                         
  -                             String st=(String)in.readObject();
  -                             throwException = new RemoteException(st);
  +                             Exception e=(Exception)in.readObject();
  +                             throwException = new RemoteException("", e);
                                break;
                        case 2:
  -                             st=(String)in.readObject();
  +                             String st=(String)in.readObject();
                                throwException = new 
org.spydermq.NoReceiverException(st);
                        }
                } catch (IOException e) {            
  @@ -83,7 +96,7 @@
        
        public void receive(SpyDestination dest,SpyMessage mes) throws Exception
        {               
  -             if (socket==null) createConnection();
  +             checkSocket();
                out.writeByte(RECEIVE);
                out.writeObject(dest);
                out.writeObject(mes);
  @@ -92,7 +105,7 @@
                
        public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
Exception      
        {
  -             if (socket==null) createConnection();
  +             checkSocket();
                out.writeByte(RECEIVE_MULTIPLE);
                out.writeObject(dest);
                out.writeInt(mes.length);
  @@ -103,7 +116,7 @@
        
        public void deleteTemporaryDestination(SpyDestination dest) throws Exception
        {
  -             if (socket==null) createConnection();
  +             checkSocket();
                out.writeByte(DELETE_TEMPORARY_DESTINATION);
                out.writeObject(dest);
                waitAnswer();
  @@ -111,9 +124,14 @@
        
        public void close() throws Exception
        {
  -             if (socket==null) createConnection();
  +             checkSocket();
                out.writeByte(CLOSE);
                waitAnswer();
        }
        
  +     protected void checkSocket() throws Exception
  +     {
  +             if (socket==null) createConnection();
  +     }
  +
   }
  
  
  
  1.4       +4 -4      
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java
  
  Index: ConnectionReceiverUILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ConnectionReceiverUILClient.java  2000/11/29 17:06:36     1.3
  +++ ConnectionReceiverUILClient.java  2000/12/12 05:58:51     1.4
  @@ -11,6 +11,7 @@
   import org.spydermq.SpyMessage;
   import org.spydermq.Log;
   import org.spydermq.SpyConnection;
  +
   import java.rmi.RemoteException;
   import java.io.ObjectOutputStream;
   import java.io.ObjectInputStream;
  @@ -25,15 +26,14 @@
   import java.net.SocketException;
   
   /**
  - *   The OIL implementation of the ConnectionReceiver object
  + *   The UIL implementation of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
  -import org.spydermq.multiplexor.SocketMultiplexor;public class 
ConnectionReceiverUILClient
  -     implements ConnectionReceiver, Serializable
  +import org.spydermq.multiplexor.SocketMultiplexor;public class 
ConnectionReceiverUILClient implements ConnectionReceiver, Serializable
   {
        static final int RECEIVE = 1;
        static final int RECEIVE_MULTIPLE = 2;
  
  
  
  1.2       +6 -0      
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImplMBean.java
  
  Index: DistributedJMSServerRMIImplMBean.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImplMBean.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- DistributedJMSServerRMIImplMBean.java     2000/06/14 23:21:00     1.1
  +++ DistributedJMSServerRMIImplMBean.java     2000/12/12 05:58:51     1.2
  @@ -1,3 +1,9 @@
  +/*
  + * spyderMQ, the OpenSource JMS implementation
  + *
  + * Distributable under GPL license.
  + * See terms of license at gnu.org.
  + */
   package org.spydermq.distributed.server;
   
   public interface DistributedJMSServerRMIImplMBean
  
  
  
  1.5       +21 -25    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java
  
  Index: DistributedJMSServerUIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- DistributedJMSServerUIL.java      2000/11/29 17:06:36     1.4
  +++ DistributedJMSServerUIL.java      2000/12/12 05:58:51     1.5
  @@ -12,14 +12,18 @@
   import javax.jms.Queue;
   import javax.jms.TemporaryTopic;
   import javax.jms.TemporaryQueue;
  +
   import org.spydermq.SpyMessage;
  -import org.spydermq.JMSServerQueue;
   import org.spydermq.SpyDestination;
  -import org.spydermq.JMSServer;
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.Log;
  +import org.spydermq.SpyAcknowledgementItem;
  +import org.spydermq.server.JMSServer;
  +import org.spydermq.Transaction;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  +import org.spydermq.multiplexor.SocketMultiplexor;
  +
   import java.rmi.RemoteException; 
   import java.net.ServerSocket;
   import java.net.Socket;
  @@ -31,21 +35,21 @@
   import java.io.IOException;
   
   /**
  - *   The OIL implementation of the DistributedJMSServer object
  + *   The UIL implementation of the DistributedJMSServer object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    *   @author Hiram Chirino ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.4 $
  + *   @version $Revision: 1.5 $
    */
  -import org.spydermq.multiplexor.SocketMultiplexor;import 
org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerUIL
  +public class DistributedJMSServerUIL
        implements Runnable, DistributedJMSServerSetup, DistributedJMSServerUILMBean
   {
   
        // Attributes ----------------------------------------------------
   
        //The server implementation
  -     private static JMSServer server;
  +     protected static JMSServer server;
        
        // Constructor ---------------------------------------------------
        
  @@ -71,8 +75,9 @@
        static final int ConnectionListening = 13;
        static final int Acknowledge = 14;      
        static final int SetSpyDistributedConnection = 15;
  +     static final int Transact = 16;
        
  -     private ServerSocket serverSocket;
  +     protected ServerSocket serverSocket;
   
        void exportObject()     
        {
  @@ -131,13 +136,13 @@
                                                result=server.getID();
                                                break;
                                        case NewMessage:                               
                 
  -                                             
newMessage((String)in.readObject(),in.readInt(),in);
  +                                             
server.addMessage(spyDistributedConnection, (SpyMessage)in.readObject());
                                                break;
                                        case Subscribe:
  -                                             
server.subscribe((Destination)in.readObject(),spyDistributedConnection);
  +                                             
server.subscribe(spyDistributedConnection, (Destination)in.readObject());
                                                break;
                                        case Unsubscribe: 
  -                                             
server.unsubscribe((Destination)in.readObject(),spyDistributedConnection);
  +                                             
server.unsubscribe(spyDistributedConnection,(Destination)in.readObject());
                                                break;
                                        case CreateTopic: 
                                                
result=(Topic)server.createTopic((String)in.readObject());
  @@ -162,16 +167,13 @@
                                                
server.checkID((String)in.readObject());
                                                break;
                                        case QueueReceive:
  -                                             
result=server.queueReceive((Queue)in.readObject(), 
in.readLong(),spyDistributedConnection);
  +                                             
result=server.queueReceive(spyDistributedConnection,(Queue)in.readObject(), 
in.readLong());
                                                break;
                                        case ConnectionListening: 
  -                                             
server.connectionListening(in.readBoolean(),(Destination)in.readObject(),spyDistributedConnection);
  +                                             
server.connectionListening(spyDistributedConnection,in.readBoolean(),(Destination)in.readObject());
                                                break;
                                        case Acknowledge:
  -                                             SpyAcknowledgementItem items[] = new 
SpyAcknowledgementItem[in.readInt()];
  -                                             for( int i=0; i < items.length; i++ )
  -                                                     items[i] = 
(SpyAcknowledgementItem)in.readObject();
  -                                             server.acknowledge(items, 
in.readBoolean(),spyDistributedConnection);
  +                                             
server.acknowledge(spyDistributedConnection, (SpyAcknowledgementItem)in.readObject());
                                                break;
                                        case SetSpyDistributedConnection: 
                                                spyDistributedConnection = 
(SpyDistributedConnection)in.readObject();
  @@ -180,6 +182,9 @@
                                                        
((ConnectionReceiverUILClient)spyDistributedConnection.cr).createConnection();
                                                }
                                                break;
  +                                     case Transact:
  +                                             
server.transact(spyDistributedConnection, (Transaction)in.readObject());
  +                                             break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
  @@ -235,16 +240,7 @@
                Log.error("Closing socket: "+st);
                Log.error(e);
        }
  -     
  -     void newMessage(String id,int nb,ObjectInputStream in) throws Exception
  -     {
  -             SpyMessage mes[] = new SpyMessage[nb];
  -             for(int i=0;i<nb;i++) 
  -                     mes[i]=(SpyMessage)in.readObject();                     
                
  -             server.newMessage(mes, id);
  -     }
  -     
        // --   
        public DistributedJMSServer createClient() throws Exception
        {               
  
  
  
  1.3       +20 -24    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java
  
  Index: DistributedJMSServerRMI.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- DistributedJMSServerRMI.java      2000/11/19 20:00:01     1.2
  +++ DistributedJMSServerRMI.java      2000/12/12 05:58:51     1.3
  @@ -5,7 +5,6 @@
    * See terms of license at gnu.org.
    */
   package org.spydermq.distributed.server;
  -
   import javax.jms.JMSException;
   import javax.jms.Destination;
   import javax.jms.Topic;
  @@ -19,32 +18,29 @@
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.SpyAcknowledgementItem;
  -
   /**
  - *   The RMI interface of the DistributedJMSServer object
  + *The RMI interface of the DistributedJMSServer object
    *      
  - *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *@author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.2 $
  + *@version $Revision: 1.3 $
    */
  -public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote
  -{  
  -             
  +public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote {
        // Public --------------------------------------------------------
  -     public String getID() throws JMSException, RemoteException;
  -     public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException;
  -     public Topic createTopic(String dest) throws JMSException, RemoteException;
  -     public Queue createQueue(String dest) throws JMSException, RemoteException;
  -     public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException, RemoteException;
  -     public void checkID(String ID) throws JMSException, RemoteException;
  -     public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection 
newSpyDistributedConnection) throws RemoteException;      
  -     public void acknowledge(SpyAcknowledgementItem[] items, boolean 
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException;        
  -     public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException;        
  -     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws Exception, RemoteException;
  -     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException;      
  -     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException;      
  -     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws Exception, RemoteException;   
  -     public void subscribe(Destination dest, SpyDistributedConnection dc) throws 
JMSException, RemoteException;      
  -     public void unsubscribe(Destination dest, SpyDistributedConnection dc) throws 
JMSException, RemoteException;
  -     
  +     public String getID() throws RemoteException, Exception;
  +     public void checkID(String ID) throws RemoteException, Exception;
  +     public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection 
newSpyDistributedConnection) throws RemoteException, Exception;
  +     public void connectionClosing(SpyDistributedConnection dc) throws 
RemoteException, Exception;
  +     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
RemoteException, Exception;
  +     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
RemoteException, Exception;
  +     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws RemoteException, Exception;
  +     public void addMessage(SpyDistributedConnection dc, SpyMessage message) throws 
RemoteException, Exception;
  +     public void connectionListening(SpyDistributedConnection dc, boolean mode, 
Destination dest) throws RemoteException, Exception;
  +     public Queue createQueue(SpyDistributedConnection dc, String dest) throws 
RemoteException, Exception;
  +     public Topic createTopic(SpyDistributedConnection dc, String dest) throws 
RemoteException, Exception;
  +     public void deleteTemporaryDestination(SpyDistributedConnection dc, 
SpyDestination dest) throws RemoteException, Exception;
  +     public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long 
wait) throws RemoteException, Exception;
  +     public void subscribe(SpyDistributedConnection dc, Destination dest) throws 
RemoteException, Exception;
  +     public void transact(SpyDistributedConnection dc, org.spydermq.Transaction t) 
throws RemoteException, Exception;
  +     public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws 
RemoteException, Exception;
   }
  
  
  
  1.11      +11 -3     
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
  
  Index: ConnectionReceiverOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
  retrieving revision 1.10
  retrieving revision 1.11
  diff -u -r1.10 -r1.11
  --- ConnectionReceiverOIL.java        2000/11/29 17:06:36     1.10
  +++ ConnectionReceiverOIL.java        2000/12/12 05:58:51     1.11
  @@ -8,18 +8,19 @@
   
   import javax.jms.Destination;
   import javax.jms.JMSException;
  -import org.spydermq.SpyConnection;
   
  +import org.spydermq.SpyConnection;
   import org.spydermq.SpyMessage;
   import org.spydermq.SpySession;
  -
   import org.spydermq.SpyDestination;
   import org.spydermq.SpyTopicConnection;
   import org.spydermq.SpyQueueSession;
   import org.spydermq.Log;
   import org.spydermq.NoReceiverException;
  +import org.spydermq.SpyMessageConsumer;
   import org.spydermq.distributed.interfaces.ConnectionReceiver;
   import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
  +
   import java.util.Hashtable;
   import java.util.HashSet;
   import java.util.Iterator;
  @@ -35,7 +36,14 @@
   import java.io.ObjectOutputStream;
   import java.io.IOException;
   
  -import org.spydermq.SpyMessageConsumer;
  +/**
  + *   The OIL implementation of the ConnectionReceiver object
  + *      
  + *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
  + * 
  + *   @version $Revision: 1.11 $
  + */
   public class ConnectionReceiverOIL 
        implements Runnable, ConnectionReceiverSetup
   {
  
  
  
  1.4       +144 -199  
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java
  
  Index: DistributedJMSServerUILClient.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- DistributedJMSServerUILClient.java        2000/11/29 17:06:36     1.3
  +++ DistributedJMSServerUILClient.java        2000/12/12 05:58:52     1.4
  @@ -12,11 +12,17 @@
   import javax.jms.Queue;
   import javax.jms.TemporaryTopic;
   import javax.jms.TemporaryQueue;
  +
   import org.spydermq.SpyMessage;
   import org.spydermq.SpyDestination;
  -import org.spydermq.JMSServer;
   import org.spydermq.Log;
   import org.spydermq.SpyDistributedConnection;
  +import org.spydermq.SpyAcknowledgementItem;
  +import org.spydermq.Transaction;
  +import org.spydermq.server.JMSServer;
  +import org.spydermq.distributed.interfaces.DistributedJMSServer;
  +import org.spydermq.multiplexor.SocketMultiplexor;
  +
   import java.rmi.RemoteException;
   import java.io.ObjectOutputStream;
   import java.io.ObjectInputStream;
  @@ -26,20 +32,17 @@
   import java.io.Serializable;
   import java.net.Socket;
   import java.net.InetAddress;
  -import org.spydermq.distributed.interfaces.DistributedJMSServer;
   
   /**
  - *   The OIL implementation of the DistributedJMSServer object
  - *      
  - *   @author Norbert Lataille ([EMAIL PROTECTED])
  - *   @author Hiram Chirino ([EMAIL PROTECTED])
  - * 
  - *   @version $Revision: 1.3 $
  + *The UIL implementation of the DistributedJMSServer object
  + *
  + *@author Norbert Lataille ([EMAIL PROTECTED])
  + *@author Hiram Chirino ([EMAIL PROTECTED])
  + *
  + *@version $Revision: 1.4 $
    */
  -import org.spydermq.multiplexor.SocketMultiplexor;import 
org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerUILClient
  -     implements DistributedJMSServer, Serializable
  -{
  -     
  +public class DistributedJMSServerUILClient implements DistributedJMSServer, 
Serializable {
  +
        static final int GetID = 1;
        static final int NewMessage = 2;
        static final int Subscribe = 3;
  @@ -51,61 +54,49 @@
        static final int ConnectionClouting = 9;
        static final int DeleteTemporaryDestination = 10;
        static final int CheckID = 11;
  -     static final int QueueReceive = 12;             
  +     static final int QueueReceive = 12;
        static final int ConnectionListening = 13;
  -     static final int Acknowledge = 14;                      
  -     static final int SetSpyDistributedConnection = 15;      
  -     
  -             
  +     static final int Acknowledge = 14;
  +     static final int SetSpyDistributedConnection = 15;
  +     static final int Transact = 16;
  +
        //Remote stuff
  -     private int port;
  -     private InetAddress addr;
  -     
  -     private transient Socket socket;
  -     transient SocketMultiplexor mSocket;    
  -
  -     private transient ObjectOutputStream out;
  -     private transient ObjectInputStream in;
  -     
  -     
  -     public DistributedJMSServerUILClient(InetAddress addr,int port)
  -     {
  -             socket=null;
  -             this.port=port;
  -             this.addr=addr;
  -     }
  -     
  -     void createConnection() throws RemoteException
  -     {
  -             try {
  -                     
  -                     System.out.println("ClientUIL initializing");
  -                     socket=new Socket(addr,port);
  -                     mSocket = new SocketMultiplexor( socket );
  +     protected int port;
  +     protected InetAddress addr;
  +     protected transient SocketMultiplexor mSocket;
  +     protected transient ObjectOutputStream out;
  +     protected transient ObjectInputStream in;
   
  -                     out=new ObjectOutputStream(new 
BufferedOutputStream(mSocket.getOutputStream(1)));
  +     public DistributedJMSServerUILClient(InetAddress addr, int port) {
  +             mSocket = null;
  +             this.port = port;
  +             this.addr = addr;
  +     }
  +
  +     protected void createConnection() throws RemoteException {
  +             try {
  +                     Socket socket = new Socket(addr, port);
  +                     mSocket = new SocketMultiplexor(socket);
  +                     out = new ObjectOutputStream(new 
BufferedOutputStream(mSocket.getOutputStream(1)));
                        out.flush();
  -                     
  -                     in=new ObjectInputStream(new 
BufferedInputStream(mSocket.getInputStream(1)));
  -                     System.out.println("ClientUIL initialized");
  -                     
  +                     in = new ObjectInputStream(new 
BufferedInputStream(mSocket.getInputStream(1)));
                } catch (Exception e) {
                        failure(e);
                }
        }
  -     
  -     public Object waitAnswer() throws RemoteException
  -     {
  +
  +     public Object waitAnswer() throws RemoteException {
                try {
                        out.reset();
                        out.flush();
  -                     int val=in.readByte();
  -                     if (val==0) return null;
  -                     if (val==1) {
  +                     int val = in.readByte();
  +                     if (val == 0)
  +                     return null;
  +                     if (val == 1) {
                                return in.readObject();
                        } else {
  -                             Exception e=(Exception)in.readObject();
  -                             throw new RemoteException("",e);        
  +                             Exception e = (Exception) in.readObject();
  +                             throw new RemoteException("", e);
                        }
                } catch (RemoteException e) {
                        Log.log(e);
  @@ -114,157 +105,104 @@
                        failure(e);
                        return null;
                }
  -             
  -     }       
  -     
  -     void failure(Exception e) throws RemoteException
  -     {
  +     }
  +
  +     void failure(Exception e) throws RemoteException {
                Log.error(e);
                throw new RemoteException("Cannot contact the remote object");
  -     }       
  -     
  -     //--- Remote Calls
  -     
  -     public void newMessage(SpyMessage val[],String id) throws JMSException, 
RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  -             try {
  -                     out.writeByte(NewMessage);
  -                     out.writeObject(id);
  -                     out.writeInt(val.length);
  -                     for(int i=0;i<val.length;i++)
  -                             out.writeObject(val[i]);
  -             } catch (IOException e) {
  -                     failure(e);
  -             }
  -             
  -             waitAnswer();
        }
   
  -     public String getID() throws Exception
  -     {
  -             if (socket==null) createConnection();
  -             
  +     public String getID() throws Exception {
  +             checkConnection();
                try {
                        out.writeByte(GetID);
                } catch (IOException e) {
                        failure(e);
                }
  -             
  -             return (String)waitAnswer();
  +             return (String) waitAnswer();
        }
  -             
  -
  -     
   
  -     
  -     public Topic createTopic(String dest) throws JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +     public void checkID(String ID) throws JMSException, RemoteException {
  +             checkConnection();
                try {
  -                     out.writeByte(CreateTopic);
  -                     out.writeObject(dest);
  +                     out.writeByte(CheckID);
  +                     out.writeObject(ID);
                } catch (IOException e) {
                        failure(e);
                }
  -             
  -             return (Topic)waitAnswer();
  +             waitAnswer();
        }
  -     
  -     public Queue createQueue(String dest) throws JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +
  +     public void setSpyDistributedConnection(SpyDistributedConnection dest) throws 
RemoteException {
  +             checkConnection();
                try {
  -                     out.writeByte(CreateQueue);
  +                     out.writeByte(SetSpyDistributedConnection);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
  -             
  -             return (Queue)waitAnswer();
  +             waitAnswer();
        }
  -     
  -
  -     
  -
  -     
   
  -     
  -     public void deleteTemporaryDestination(SpyDestination dest) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +     public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException {
  +             checkConnection();
                try {
  -                     out.writeByte(DeleteTemporaryDestination);
  -                     out.writeObject(dest);
  +                     out.writeByte(ConnectionClouting);
                } catch (IOException e) {
                        failure(e);
                }
  -             
                waitAnswer();
        }
  -     
  -     public void checkID(String ID) throws JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +
  +     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException {
  +             checkConnection();
                try {
  -                     out.writeByte(CheckID);
  -                     out.writeObject(ID);
  +                     out.writeByte(GetTemporaryQueue);
                } catch (IOException e) {
                        failure(e);
                }
  -             
  -             waitAnswer();
  +             return (TemporaryQueue) waitAnswer();
        }
  -     
  -     public void setSpyDistributedConnection(SpyDistributedConnection dest) throws 
RemoteException {
  -             if (socket==null) createConnection();
  -             
  +
  +     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException {
  +             checkConnection();
                try {
  -                     out.writeByte( SetSpyDistributedConnection );
  -                     out.writeObject(dest);
  +                     out.writeByte(GetTemporaryTopic);
                } catch (IOException e) {
                        failure(e);
  -             }               
  -             waitAnswer();
  -     }       
  -     
  -     public void acknowledge(SpyAcknowledgementItem item[],boolean 
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException {
  -             if (socket==null) createConnection();
  -             
  +             }
  +             return (TemporaryTopic) waitAnswer();
  +     }
  +
  +     public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem 
item) throws JMSException, RemoteException {
  +             checkConnection();
                try {
                        out.writeByte(Acknowledge);
  -                     out.writeInt(item.length);
  -                     for( int i=0; i< item.length; i++ )
  -                             out.writeObject(item[i]);
  -                     out.writeBoolean(isAck);
  +                     out.writeObject(item);
                } catch (IOException e) {
                        failure(e);
                }
  -             
                waitAnswer();
  -     }       
  -     
  -     public void connectionClosing(SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +     }
  +
  +     public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws 
JMSException, RemoteException {
  +             checkConnection();
                try {
  -                     out.writeByte(ConnectionClouting);
  +                     out.writeByte(NewMessage);
  +                     out.writeObject(val);
                } catch (IOException e) {
                        failure(e);
                }
                waitAnswer();
  -     }       
  -     
  -     public void connectionListening(boolean mode,Destination 
dest,SpyDistributedConnection dc) throws Exception, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +     }
  +
  +     protected void checkConnection() throws RemoteException {
  +             if (mSocket == null)
  +             createConnection();
  +     }
  +
  +     public void connectionListening(SpyDistributedConnection dc, boolean mode, 
Destination dest) throws Exception, RemoteException {
  +             checkConnection();
                try {
                        out.writeByte(ConnectionListening);
                        out.writeBoolean(mode);
  @@ -272,77 +210,84 @@
                } catch (IOException e) {
                        failure(e);
                }
  -             
                waitAnswer();
  -     }       
  -     
  -     public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +     }
  +
  +     public Queue createQueue(SpyDistributedConnection dc, String dest) throws 
JMSException, RemoteException {
  +             checkConnection();
                try {
  -                     out.writeByte(GetTemporaryQueue);
  +                     out.writeByte(CreateQueue);
  +                     out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
  -             
  -             return (TemporaryQueue)waitAnswer();            
  -     }       
  -     
  -     public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +             return (Queue) waitAnswer();
  +     }
  +
  +     public Topic createTopic(SpyDistributedConnection dc, String dest) throws 
JMSException, RemoteException {
  +             checkConnection();
                try {
  -                     out.writeByte(GetTemporaryTopic);
  +                     out.writeByte(CreateTopic);
  +                     out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
  -             
  -             return (TemporaryTopic)waitAnswer();            
  -     }       
  -     
  -     public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection 
dc) throws Exception, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +             return (Topic) waitAnswer();
  +     }
  +
  +     public void deleteTemporaryDestination(SpyDistributedConnection dc, 
SpyDestination dest) throws JMSException, RemoteException {
  +             checkConnection();
                try {
  +                     out.writeByte(DeleteTemporaryDestination);
  +                     out.writeObject(dest);
  +             } catch (IOException e) {
  +                     failure(e);
  +             }
  +             waitAnswer();
  +     }
  +
  +     public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long 
wait) throws Exception, RemoteException {
  +             checkConnection();
  +             try {
                        out.writeByte(QueueReceive);
                        out.writeObject(queue);
                        out.writeLong(wait);
                } catch (IOException e) {
                        failure(e);
                }
  -             
  -             return (SpyMessage)waitAnswer();
  -     }       
  -     
  -     public void subscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +             return (SpyMessage) waitAnswer();
  +     }
  +
  +     public void subscribe(SpyDistributedConnection dc, Destination dest) throws 
JMSException, RemoteException {
  +             checkConnection();
                try {
                        out.writeByte(Subscribe);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
  +             }
  +             waitAnswer();
  +     }
  +
  +     public void transact(org.spydermq.SpyDistributedConnection dc, Transaction t) 
throws JMSException, RemoteException {
  +             checkConnection();
  +             try {
  +                     out.writeByte(Transact);
  +                     out.writeObject(t);
  +             } catch (IOException e) {
  +                     failure(e);
                }
  -             
                waitAnswer();
  -     }       
  -     
  -     public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws 
JMSException, RemoteException
  -     {
  -             if (socket==null) createConnection();
  -             
  +     }
  +
  +     public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws 
JMSException, RemoteException {
  +             checkConnection();
                try {
                        out.writeByte(Unsubscribe);
                        out.writeObject(dest);
                } catch (IOException e) {
                        failure(e);
                }
  -             
                waitAnswer();
        }
  -     
   }
  
  
  
  1.7       +35 -85    
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
  
  Index: DistributedJMSServerOIL.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- DistributedJMSServerOIL.java      2000/11/29 17:06:36     1.6
  +++ DistributedJMSServerOIL.java      2000/12/12 05:58:52     1.7
  @@ -1,3 +1,9 @@
  +/*
  + * spyderMQ, the OpenSource JMS implementation
  + *
  + * Distributable under GPL license.
  + * See terms of license at gnu.org.
  + */
   package org.spydermq.distributed.server;
   
   import javax.jms.JMSException;
  @@ -6,14 +12,16 @@
   import javax.jms.Queue;
   import javax.jms.TemporaryTopic;
   import javax.jms.TemporaryQueue;
  +
   import org.spydermq.SpyMessage;
  -import org.spydermq.JMSServerQueue;
   import org.spydermq.SpyDestination;
  -import org.spydermq.JMSServer;
   import org.spydermq.SpyDistributedConnection;
   import org.spydermq.Log;
  +import org.spydermq.SpyAcknowledgementItem;
   import org.spydermq.distributed.interfaces.DistributedJMSServer;
   import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
  +import org.spydermq.Transaction;
  +
   import java.rmi.RemoteException; 
   import java.net.ServerSocket;
   import java.net.Socket;
  @@ -23,54 +31,23 @@
   import java.io.ObjectInputStream;
   import java.io.BufferedInputStream;
   import java.io.IOException;
  -import org.spydermq.SpyAcknowledgementItem;
   
  -public class DistributedJMSServerOIL
  -     implements Runnable, DistributedJMSServerSetup, DistributedJMSServerOILMBean
  +/**
  + *   The OIL implementation of the DistributedJMSServer object
  + *      
  + *   @author Norbert Lataille ([EMAIL PROTECTED])
  + *   @author Hiram Chirino ([EMAIL PROTECTED])
  + * 
  + *   @version $Revision: 1.7 $
  + */
  +public class DistributedJMSServerOIL extends DistributedJMSServerUIL
  +     implements DistributedJMSServerOILMBean
   {
  -
  -     // Attributes ----------------------------------------------------
  -
  -     //The server implementation
  -     private static JMSServer server;
  -     
  -     // Constructor ---------------------------------------------------
        
  -     public DistributedJMSServerOIL() 
  -     {
  -             exportObject();
  +     public DistributedJMSServerOIL() { 
  +     // Default constructor
        }
   
  -     // Internals -----------------------------------------------------
  -     
  -     static final int GetID = 1;
  -     static final int NewMessage = 2;
  -     static final int Subscribe = 3;
  -     static final int Unsubscribe = 4;
  -     static final int CreateTopic = 5;
  -     static final int CreateQueue = 6;
  -     static final int GetTemporaryTopic = 7;
  -     static final int GetTemporaryQueue = 8;
  -     static final int ConnectionClosing = 9;
  -     static final int DeleteTemporaryDestination = 10;
  -     static final int CheckID = 11;
  -     static final int QueueReceive = 12;     
  -     static final int ConnectionListening = 13;
  -     static final int Acknowledge = 14;      
  -     static final int SetSpyDistributedConnection = 15;
  -     
  -     private ServerSocket serverSocket;
  -
  -     void exportObject()     
  -     {
  -             try {
  -                      serverSocket = new ServerSocket(0);
  -                      new Thread(this).start();
  -             } catch (IOException e) {
  -                     failure("Initialization",e);
  -             }
  -     }
  -     
        public void run()
        {
                Socket socket = null;
  @@ -93,14 +70,12 @@
                        failure("Initialisation",e);
                        return;
                }
  -
  +             
                while (!closed) {
   
                        try {
                                code=in.readByte();             
                        } catch (IOException e) {
  -                             if( closed )
  -                                     break;
                                Log.notice("Command read");
                                Log.notice(e);
                                break;
  @@ -116,13 +91,13 @@
                                                result=server.getID();
                                                break;
                                        case NewMessage:                               
                 
  -                                             
newMessage((String)in.readObject(),in.readInt(),in);
  +                                             
server.addMessage(spyDistributedConnection, (SpyMessage)in.readObject());
                                                break;
                                        case Subscribe:
  -                                             
server.subscribe((Destination)in.readObject(),spyDistributedConnection);
  +                                             
server.subscribe(spyDistributedConnection, (Destination)in.readObject());
                                                break;
                                        case Unsubscribe: 
  -                                             
server.unsubscribe((Destination)in.readObject(),spyDistributedConnection);
  +                                             
server.unsubscribe(spyDistributedConnection,(Destination)in.readObject());
                                                break;
                                        case CreateTopic: 
                                                
result=(Topic)server.createTopic((String)in.readObject());
  @@ -138,7 +113,7 @@
                                                break;
                                        case ConnectionClosing: 
                                                
server.connectionClosing(spyDistributedConnection,null);
  -                                             closed=true;
  +                                             closed = true;
                                                break;
                                        case DeleteTemporaryDestination: 
                                                
server.deleteTemporaryDestination((SpyDestination)in.readObject());
  @@ -147,20 +122,20 @@
                                                
server.checkID((String)in.readObject());
                                                break;
                                        case QueueReceive:
  -                                             
result=server.queueReceive((Queue)in.readObject(), 
in.readLong(),spyDistributedConnection);
  +                                             
result=server.queueReceive(spyDistributedConnection,(Queue)in.readObject(), 
in.readLong());
                                                break;
                                        case ConnectionListening: 
  -                                             
server.connectionListening(in.readBoolean(),(Destination)in.readObject(),spyDistributedConnection);
  +                                             
server.connectionListening(spyDistributedConnection,in.readBoolean(),(Destination)in.readObject());
                                                break;
                                        case Acknowledge:
  -                                             SpyAcknowledgementItem items[] = new 
SpyAcknowledgementItem[in.readInt()];
  -                                             for( int i=0; i < items.length; i++ )
  -                                                     items[i] = 
(SpyAcknowledgementItem)in.readObject();
  -                                             server.acknowledge(items, 
in.readBoolean(),spyDistributedConnection);
  +                                             
server.acknowledge(spyDistributedConnection, (SpyAcknowledgementItem)in.readObject());
                                                break;
                                        case SetSpyDistributedConnection: 
                                                spyDistributedConnection = 
(SpyDistributedConnection)in.readObject();
                                                break;
  +                                     case Transact:
  +                                             
server.transact(spyDistributedConnection, (Transaction)in.readObject());
  +                                             break;
                                        default:
                                                throw new RemoteException("Bad method 
code !");
                                }
  @@ -189,16 +164,14 @@
   
                                try {
                                        out.writeByte(2);
  -                                     out.writeObject(e.getMessage());
  +                                     out.writeObject(e);
                                        out.reset();
                                        out.flush();
                                } catch (IOException e2) {
                                        failure("Result write",e2);
                                        break;                                  
  -                             }
  -                             
  +                             }                               
                        }
  -
                }
                
                try {
  @@ -210,33 +183,10 @@
                        Log.log(e);
                }
        }
  -     
  -     void failure(String st,Exception e)
  -     {
  -             Log.error("Closing socket: "+st);
  -             Log.error(e);
  -     }
  -     
  -     void newMessage(String id,int nb,ObjectInputStream in) throws Exception
  -     {
  -             SpyMessage mes[] = new SpyMessage[nb];
  -             for(int i=0;i<nb;i++) 
  -                     mes[i]=(SpyMessage)in.readObject();                     
                
  -             server.newMessage(mes, id);
  -     }
  -     
  -     // --
  -     
        public DistributedJMSServer createClient() throws Exception
        {               
                return new 
DistributedJMSServerOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
        }
        
  -     public void setServer(JMSServer s)
  -     {
  -             server=s;
  -     }
  -     
  -
   }
  
  
  
  1.4       +5 -3      
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMI.java
  
  Index: ConnectionReceiverRMI.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMI.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- ConnectionReceiverRMI.java        2000/06/19 21:52:00     1.3
  +++ ConnectionReceiverRMI.java        2000/12/12 05:58:52     1.4
  @@ -8,18 +8,20 @@
   
   import javax.jms.Destination;
   import javax.jms.JMSException;
  +
   import org.spydermq.SpyMessage;
   import org.spydermq.SpyDestination;
  +import org.spydermq.distributed.interfaces.ConnectionReceiver;
  +
   import java.rmi.Remote;
   import java.rmi.RemoteException;
  -import org.spydermq.distributed.interfaces.ConnectionReceiver;
   
   /**
    *   The RMI interface of the ConnectionReceiver object
    *      
    *   @author Norbert Lataille ([EMAIL PROTECTED])
    * 
  - *   @version $Revision: 1.3 $
  + *   @version $Revision: 1.4 $
    */
   public interface ConnectionReceiverRMI 
        extends ConnectionReceiver, Remote
  @@ -27,7 +29,7 @@
        // Public --------------------------------------------------------
   
        //A message has arrived for the Connection 
  -    public void receive(SpyDestination b,SpyMessage c) throws RemoteException, 
JMSException;
  +     public void receive(SpyDestination b,SpyMessage c) throws RemoteException, 
JMSException;
        //Messages have arrived for the Connection 
        public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws 
RemoteException, JMSException;
        //One TemporaryDestination has been deleted
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/distributed/server/DistributedConnectionFactoryRMIImplMBean.java
  
  Index: DistributedConnectionFactoryRMIImplMBean.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.distributed.server;
  
  public interface DistributedConnectionFactoryRMIImplMBean
  {
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/distributed/server/DistributedConnectionFactoryRMI.java
  
  Index: DistributedConnectionFactoryRMI.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.distributed.server;
  
  import javax.jms.QueueConnection;
  import javax.jms.TopicConnection;
  
  import java.rmi.Remote;
  import java.rmi.RemoteException;
  
  import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
  
  /**
   *    The RMI interface of the DistributedConnectionFactory object
   *
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public interface DistributedConnectionFactoryRMI extends 
DistributedConnectionFactory, Remote
  { 
        // Public --------------------------------------------------------
  
        public QueueConnection createQueueConnection() throws Exception;
        public QueueConnection createQueueConnection(String userName, String password) 
throws Exception;
        public TopicConnection createTopicConnection() throws Exception;        
        public TopicConnection createTopicConnection(String userName, String password) 
throws Exception;
        
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/distributed/server/DistributedConnectionFactoryRMIImpl.java
  
  Index: DistributedConnectionFactoryRMIImpl.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.distributed.server;
  
  import java.rmi.RemoteException; 
  import java.rmi.server.UnicastRemoteObject;
  import javax.jms.QueueConnection;
  import javax.jms.QueueConnectionFactory;
  import javax.jms.JMSException;
  import org.spydermq.distributed.interfaces.DistributedJMSServer;
  import org.spydermq.SpyQueueConnection;
  import org.spydermq.security.SecurityManager;
  import java.util.Hashtable;
  
  import javax.jms.TopicConnectionFactory;
  import org.spydermq.SpyTopicConnection;
  import org.spydermq.server.JMSServer;
  import javax.jms.TopicConnection;
  
  /**
   *    The RMI implementation of the DistributedConnectionFactory object
   *
   *    @author Norbert Lataille ([EMAIL PROTECTED])
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class DistributedConnectionFactoryRMIImpl 
        extends UnicastRemoteObject 
        implements DistributedConnectionFactoryRMI, 
DistributedConnectionFactoryRMIImplMBean
  {
        // Attributes ----------------------------------------------------
   
        protected DistributedJMSServer server;
        private String crCN;
        protected SecurityManager securityManager;
  
        // Constructor ---------------------------------------------------
           
        public DistributedConnectionFactoryRMIImpl() throws RemoteException
        {
                super();
        }
  
        // Public --------------------------------------------------------
        
        public void setServer(DistributedJMSServer theServer)
        {
                server=theServer;
        }
        
        public void setCRClassName(String className)
        {
                crCN=className;
        }
        
        public void setSecurityManager(SecurityManager securityManager)
        {
                this.securityManager=securityManager; 
        }
  
        public QueueConnection createQueueConnection() throws JMSException
        {
                SpyQueueConnection obj=new SpyQueueConnection(server,null,crCN);
                return obj;
        }
  
        public QueueConnection createQueueConnection(String userName, String password) 
throws JMSException
        {
                String id=securityManager.checkUser(userName,password);
                return new SpyQueueConnection(server,id,crCN);
        }
                
        public TopicConnection createTopicConnection() throws JMSException
        {
                SpyTopicConnection obj=new SpyTopicConnection(server,null,crCN);
                return obj;
        }       
        
        public TopicConnection createTopicConnection(String userName, String password) 
throws JMSException
        {
                String id=securityManager.checkUser(userName,password);
                SpyTopicConnection obj=new SpyTopicConnection(server,id,crCN);         
 
                return obj;
        }       
        
        public void setConnectionReceiverClassName(String className)
        {
                crCN=className;
        }
  
  }
  
  
  

Reply via email to