User: hiram
Date: 00/12/11 21:58:54
Modified: src/java/org/spydermq/distributed/server
DistributedJMSServerRMIImpl.java
ConnectionReceiverRMIImpl.java
DistributedJMSServerOILClient.java
ConnectionReceiverOILClient.java
ConnectionReceiverUILClient.java
DistributedJMSServerRMIImplMBean.java
DistributedJMSServerUIL.java
DistributedJMSServerRMI.java
ConnectionReceiverOIL.java
DistributedJMSServerUILClient.java
DistributedJMSServerOIL.java
ConnectionReceiverRMI.java
Added: src/java/org/spydermq/distributed/server
DistributedConnectionFactoryRMIImplMBean.java
DistributedConnectionFactoryRMI.java
DistributedConnectionFactoryRMIImpl.java
Removed: src/java/org/spydermq/distributed/server
DistributedQueueConnectionFactoryRMI.java
DistributedTopicConnectionFactoryRMI.java
DistributedTopicConnectionFactoryRMIImplMBean.java
DistributedQueueConnectionFactoryRMIImpl.java
DistributedQueueConnectionFactoryRMIImplMBean.java
DistributedTopicConnectionFactoryRMIImpl.java
Log:
Several Chanages:
- Invocation layer simplified by joing the DistributedQueueConnectionFactory and
DistributedTopicConnectionFactory into DistributedConnectionFactory
- Seperated server code from client code ( server code moved to org.spydermq.server )
- All publish() calls are now sync to the provider.
- Added a Transaction class to better represent a commit/rollback request to the
server.
- Now have a InvocationLayerFactory so that we can potentialy load multiple
invocation layers (OIL/UIL/RMI) at the same time.
Revision Changes Path
1.5 +22 -26
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java
Index: DistributedJMSServerRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- DistributedJMSServerRMIImpl.java 2000/11/19 20:00:01 1.4
+++ DistributedJMSServerRMIImpl.java 2000/12/12 05:58:50 1.5
@@ -16,21 +16,15 @@
import java.rmi.RemoteException;
import org.spydermq.SpyMessage;
import org.spydermq.SpyDestination;
-import org.spydermq.JMSServer;
+
import org.spydermq.SpyDistributedConnection;
import org.spydermq.Log;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
import org.spydermq.SpyAcknowledgementItem;
-/**
- * The RMI implementation of the DistributedJMSServer object
- *
- * @author Norbert Lataille ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.4 $
- */
-public class DistributedJMSServerRMIImpl
+import org.spydermq.server.JMSServer;
+import org.spydermq.Transaction;public class DistributedJMSServerRMIImpl
extends UnicastRemoteObject
implements DistributedJMSServerRMI, DistributedJMSServerRMIImplMBean,
DistributedJMSServerSetup
{
@@ -52,22 +46,22 @@
return server.getID();
}
- public void newMessage(SpyMessage val[],String id) throws JMSException
+ public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws
JMSException
{
- server.newMessage(val,id);
+ server.addMessage(dc, val);
}
- public Topic createTopic(String dest) throws JMSException
+ public Topic createTopic(SpyDistributedConnection dc, String dest) throws
JMSException
{
return server.createTopic(dest);
}
- public Queue createQueue(String dest) throws JMSException
+ public Queue createQueue(SpyDistributedConnection dc, String dest) throws
JMSException
{
return server.createQueue(dest);
}
- public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
+ public void deleteTemporaryDestination(SpyDistributedConnection dc,
SpyDestination dest) throws JMSException
{
server.deleteTemporaryDestination(dest);
}
@@ -89,13 +83,13 @@
server=s;
}
- public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection
newSpyDistributedConnection) {
+ public void setSpyDistributedConnection(SpyDistributedConnection
newSpyDistributedConnection) {
// We cannot try to cache the dc since different dc's are going
// to access the same remote object via RMI
}
- public void acknowledge(SpyAcknowledgementItem[] items, boolean
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException{
- server.acknowledge(items, isAck, dc);
+ public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws JMSException, RemoteException{
+ server.acknowledge(dc, item);
}
public void connectionClosing(SpyDistributedConnection dc) throws JMSException
@@ -103,9 +97,9 @@
server.connectionClosing(dc,null);
}
- public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws JMSException
+ public void connectionListening(SpyDistributedConnection dc,boolean
mode,Destination dest) throws JMSException
{
- server.connectionListening(mode,dest,dc);
+ server.connectionListening(dc,mode,dest);
}
public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException
@@ -118,19 +112,21 @@
return server.getTemporaryTopic(dc);
}
- public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws JMSException
+ public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long
wait) throws JMSException
{
- return server.queueReceive(queue, wait, dc);
+ return server.queueReceive(dc,queue, wait);
}
- public void subscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException
+ public void subscribe(SpyDistributedConnection dc, Destination dest) throws
JMSException
{
- server.subscribe(dest,dc);
+ server.subscribe(dc,dest);
}
- public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException
+ public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws
JMSException
{
- server.unsubscribe(dest,dc);
+ server.unsubscribe(dc,dest);
}
-}
+ public void transact(org.spydermq.SpyDistributedConnection dc, Transaction t)
throws JMSException {
+ server.transact(dc,t);
+ }}
1.13 +8 -6
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
Index: ConnectionReceiverRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- ConnectionReceiverRMIImpl.java 2000/11/19 20:00:01 1.12
+++ ConnectionReceiverRMIImpl.java 2000/12/12 05:58:50 1.13
@@ -8,30 +8,32 @@
import javax.jms.Destination;
import javax.jms.JMSException;
-import org.spydermq.SpyConnection;
+import org.spydermq.SpyConnection;
import org.spydermq.SpyMessage;
import org.spydermq.SpySession;
-
import org.spydermq.SpyDestination;
import org.spydermq.SpyTopicConnection;
import org.spydermq.SpyQueueSession;
import org.spydermq.Log;
import org.spydermq.NoReceiverException;
+import org.spydermq.SpyMessageConsumer;
+import org.spydermq.distributed.interfaces.ConnectionReceiver;
+import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
+
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.Hashtable;
import java.util.HashSet;
import java.util.Iterator;
-import org.spydermq.distributed.interfaces.ConnectionReceiver;
-import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
-import org.spydermq.SpyMessageConsumer;/**
+/**
* The RMI implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class ConnectionReceiverRMIImpl
extends UnicastRemoteObject
1.5 +27 -288
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
Index: DistributedJMSServerOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- DistributedJMSServerOILClient.java 2000/11/29 17:06:36 1.4
+++ DistributedJMSServerOILClient.java 2000/12/12 05:58:50 1.5
@@ -1,3 +1,9 @@
+/*
+ * spyderMQ, the OpenSource JMS implementation
+ *
+ * Distributable under GPL license.
+ * See terms of license at gnu.org.
+ */
package org.spydermq.distributed.server;
import javax.jms.JMSException;
@@ -6,11 +12,15 @@
import javax.jms.Queue;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
+
import org.spydermq.SpyMessage;
import org.spydermq.SpyDestination;
-import org.spydermq.JMSServer;
import org.spydermq.Log;
import org.spydermq.SpyDistributedConnection;
+import org.spydermq.distributed.interfaces.DistributedJMSServer;
+import org.spydermq.SpyAcknowledgementItem;
+import org.spydermq.server.JMSServer;
+
import java.rmi.RemoteException;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
@@ -20,47 +30,27 @@
import java.io.Serializable;
import java.net.Socket;
import java.net.InetAddress;
-import org.spydermq.distributed.interfaces.DistributedJMSServer;
-
-import org.spydermq.SpyAcknowledgementItem;public class
DistributedJMSServerOILClient
- implements DistributedJMSServer, Serializable
-{
-
- static final int GetID = 1;
- static final int NewMessage = 2;
- static final int Subscribe = 3;
- static final int Unsubscribe = 4;
- static final int CreateTopic = 5;
- static final int CreateQueue = 6;
- static final int GetTemporaryTopic = 7;
- static final int GetTemporaryQueue = 8;
- static final int ConnectionClosing = 9;
- static final int DeleteTemporaryDestination = 10;
- static final int CheckID = 11;
- static final int QueueReceive = 12;
- static final int ConnectionListening = 13;
- static final int Acknowledge = 14;
- static final int SetSpyDistributedConnection = 15;
-
- //Remote stuff
-
- private transient Socket socket;
-
- private transient ObjectOutputStream out;
- private transient ObjectInputStream in;
+/**
+ *The OIL implementation of the DistributedJMSServer object
+ *
+ *@author Norbert Lataille ([EMAIL PROTECTED])
+ *@author Hiram Chirino ([EMAIL PROTECTED])
+ *
+ *@version $Revision: 1.5 $
+ */
+public class DistributedJMSServerOILClient extends DistributedJMSServerUILClient
+ implements DistributedJMSServer, Serializable {
- private int port;
- private InetAddress addr;
+ Socket socket;
public DistributedJMSServerOILClient(InetAddress addr,int port)
{
+ super(addr,port);
socket=null;
- this.port=port;
- this.addr=addr;
}
- void createConnection() throws RemoteException
+ protected void createConnection() throws RemoteException
{
try {
socket=new Socket(addr,port);
@@ -72,260 +62,9 @@
}
}
- public Object waitAnswer() throws RemoteException
- {
- try {
- out.reset();
- out.flush();
- int val=in.readByte();
- if (val==0) return null;
- if (val==1) {
- return in.readObject();
- } else {
- String st=(String)in.readObject();
- throw new RemoteException(st);
- }
- } catch (Exception e) {
- failure(e);
- return null;
- }
-
- }
-
- void failure(Exception e) throws RemoteException
- {
- Log.error(e);
- throw new RemoteException("Cannot contact the remote object");
- }
-
- //--- Remote Calls
-
- public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(NewMessage);
- out.writeObject(id);
- out.writeInt(val.length);
- for(int i=0;i<val.length;i++)
- out.writeObject(val[i]);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
+ protected void checkConnection() throws RemoteException {
+ if (socket == null)
+ createConnection();
}
- public String getID() throws Exception
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(GetID);
- } catch (IOException e) {
- failure(e);
- }
-
- return (String)waitAnswer();
- }
-
- public void subscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(Subscribe);
- out.writeObject(dest);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
- }
-
- public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(Unsubscribe);
- out.writeObject(dest);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
- }
-
- public Topic createTopic(String dest) throws JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(CreateTopic);
- out.writeObject(dest);
- } catch (IOException e) {
- failure(e);
- }
-
- return (Topic)waitAnswer();
- }
-
- public Queue createQueue(String dest) throws JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(CreateQueue);
- out.writeObject(dest);
- } catch (IOException e) {
- failure(e);
- }
-
- return (Queue)waitAnswer();
- }
-
- public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(GetTemporaryTopic);
- } catch (IOException e) {
- failure(e);
- }
-
- return (TemporaryTopic)waitAnswer();
- }
-
- public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(GetTemporaryQueue);
- } catch (IOException e) {
- failure(e);
- }
-
- return (TemporaryQueue)waitAnswer();
- }
-
- public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(ConnectionClosing);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
- }
-
- public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(DeleteTemporaryDestination);
- out.writeObject(dest);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
- }
-
- public void checkID(String ID) throws JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(CheckID);
- out.writeObject(ID);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
- }
-
-
- public void connectionListening(boolean mode,Destination dest,
SpyDistributedConnection dc) throws Exception, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(ConnectionListening);
- out.writeBoolean(mode);
- out.writeObject(dest);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
- }
-
- public void acknowledge(SpyAcknowledgementItem item[],boolean
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(Acknowledge);
- out.writeInt(item.length);
- for( int i=0; i< item.length; i++ )
- out.writeObject(item[i]);
- out.writeBoolean(isAck);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
- }
-
- public SpyMessage queueReceive(Queue queue, long wait) throws Exception,
RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(QueueReceive);
- out.writeObject(queue);
- out.writeLong(wait);
- } catch (IOException e) {
- failure(e);
- }
-
- return (SpyMessage)waitAnswer();
- }
-
- public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws Exception, RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(QueueReceive);
- out.writeObject(queue);
- out.writeLong(wait);
- } catch (IOException e) {
- failure(e);
- }
-
- return (SpyMessage)waitAnswer();
- }
-
-
- public void setSpyDistributedConnection(SpyDistributedConnection dest) throws
RemoteException {
- if (socket==null) createConnection();
-
- try {
- out.writeByte( SetSpyDistributedConnection );
- out.writeObject(dest);
- } catch (IOException e) {
- failure(e);
- }
- waitAnswer();
- }
-
}
1.12 +28 -10
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
Index: ConnectionReceiverOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- ConnectionReceiverOILClient.java 2000/11/29 17:06:36 1.11
+++ ConnectionReceiverOILClient.java 2000/12/12 05:58:50 1.12
@@ -1,3 +1,9 @@
+/*
+ * spyderMQ, the OpenSource JMS implementation
+ *
+ * Distributable under GPL license.
+ * See terms of license at gnu.org.
+ */
package org.spydermq.distributed.server;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
@@ -5,6 +11,7 @@
import org.spydermq.SpyMessage;
import org.spydermq.Log;
import org.spydermq.SpyConnection;
+
import java.rmi.RemoteException;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
@@ -18,6 +25,14 @@
import java.net.InetAddress;
import java.net.SocketException;
+/**
+ * The UIL implementation of the ConnectionReceiver object
+ *
+ * @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
+ *
+ * @version $Revision: 1.12 $
+ */
public class ConnectionReceiverOILClient
implements ConnectionReceiver, Serializable
{
@@ -27,11 +42,9 @@
static final int CLOSE = 4;
private transient Socket socket;
-
-
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
-
+
private int port;
private InetAddress addr;
@@ -64,11 +77,11 @@
int val=in.readByte();
switch(val) {
case 1:
- String st=(String)in.readObject();
- throwException = new RemoteException(st);
+ Exception e=(Exception)in.readObject();
+ throwException = new RemoteException("", e);
break;
case 2:
- st=(String)in.readObject();
+ String st=(String)in.readObject();
throwException = new
org.spydermq.NoReceiverException(st);
}
} catch (IOException e) {
@@ -83,7 +96,7 @@
public void receive(SpyDestination dest,SpyMessage mes) throws Exception
{
- if (socket==null) createConnection();
+ checkSocket();
out.writeByte(RECEIVE);
out.writeObject(dest);
out.writeObject(mes);
@@ -92,7 +105,7 @@
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
Exception
{
- if (socket==null) createConnection();
+ checkSocket();
out.writeByte(RECEIVE_MULTIPLE);
out.writeObject(dest);
out.writeInt(mes.length);
@@ -103,7 +116,7 @@
public void deleteTemporaryDestination(SpyDestination dest) throws Exception
{
- if (socket==null) createConnection();
+ checkSocket();
out.writeByte(DELETE_TEMPORARY_DESTINATION);
out.writeObject(dest);
waitAnswer();
@@ -111,9 +124,14 @@
public void close() throws Exception
{
- if (socket==null) createConnection();
+ checkSocket();
out.writeByte(CLOSE);
waitAnswer();
}
+ protected void checkSocket() throws Exception
+ {
+ if (socket==null) createConnection();
+ }
+
}
1.4 +4 -4
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java
Index: ConnectionReceiverUILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ConnectionReceiverUILClient.java 2000/11/29 17:06:36 1.3
+++ ConnectionReceiverUILClient.java 2000/12/12 05:58:51 1.4
@@ -11,6 +11,7 @@
import org.spydermq.SpyMessage;
import org.spydermq.Log;
import org.spydermq.SpyConnection;
+
import java.rmi.RemoteException;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
@@ -25,15 +26,14 @@
import java.net.SocketException;
/**
- * The OIL implementation of the ConnectionReceiver object
+ * The UIL implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
-import org.spydermq.multiplexor.SocketMultiplexor;public class
ConnectionReceiverUILClient
- implements ConnectionReceiver, Serializable
+import org.spydermq.multiplexor.SocketMultiplexor;public class
ConnectionReceiverUILClient implements ConnectionReceiver, Serializable
{
static final int RECEIVE = 1;
static final int RECEIVE_MULTIPLE = 2;
1.2 +6 -0
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImplMBean.java
Index: DistributedJMSServerRMIImplMBean.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImplMBean.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- DistributedJMSServerRMIImplMBean.java 2000/06/14 23:21:00 1.1
+++ DistributedJMSServerRMIImplMBean.java 2000/12/12 05:58:51 1.2
@@ -1,3 +1,9 @@
+/*
+ * spyderMQ, the OpenSource JMS implementation
+ *
+ * Distributable under GPL license.
+ * See terms of license at gnu.org.
+ */
package org.spydermq.distributed.server;
public interface DistributedJMSServerRMIImplMBean
1.5 +21 -25
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java
Index: DistributedJMSServerUIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- DistributedJMSServerUIL.java 2000/11/29 17:06:36 1.4
+++ DistributedJMSServerUIL.java 2000/12/12 05:58:51 1.5
@@ -12,14 +12,18 @@
import javax.jms.Queue;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
+
import org.spydermq.SpyMessage;
-import org.spydermq.JMSServerQueue;
import org.spydermq.SpyDestination;
-import org.spydermq.JMSServer;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.Log;
+import org.spydermq.SpyAcknowledgementItem;
+import org.spydermq.server.JMSServer;
+import org.spydermq.Transaction;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
+import org.spydermq.multiplexor.SocketMultiplexor;
+
import java.rmi.RemoteException;
import java.net.ServerSocket;
import java.net.Socket;
@@ -31,21 +35,21 @@
import java.io.IOException;
/**
- * The OIL implementation of the DistributedJMSServer object
+ * The UIL implementation of the DistributedJMSServer object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
-import org.spydermq.multiplexor.SocketMultiplexor;import
org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerUIL
+public class DistributedJMSServerUIL
implements Runnable, DistributedJMSServerSetup, DistributedJMSServerUILMBean
{
// Attributes ----------------------------------------------------
//The server implementation
- private static JMSServer server;
+ protected static JMSServer server;
// Constructor ---------------------------------------------------
@@ -71,8 +75,9 @@
static final int ConnectionListening = 13;
static final int Acknowledge = 14;
static final int SetSpyDistributedConnection = 15;
+ static final int Transact = 16;
- private ServerSocket serverSocket;
+ protected ServerSocket serverSocket;
void exportObject()
{
@@ -131,13 +136,13 @@
result=server.getID();
break;
case NewMessage:
-
newMessage((String)in.readObject(),in.readInt(),in);
+
server.addMessage(spyDistributedConnection, (SpyMessage)in.readObject());
break;
case Subscribe:
-
server.subscribe((Destination)in.readObject(),spyDistributedConnection);
+
server.subscribe(spyDistributedConnection, (Destination)in.readObject());
break;
case Unsubscribe:
-
server.unsubscribe((Destination)in.readObject(),spyDistributedConnection);
+
server.unsubscribe(spyDistributedConnection,(Destination)in.readObject());
break;
case CreateTopic:
result=(Topic)server.createTopic((String)in.readObject());
@@ -162,16 +167,13 @@
server.checkID((String)in.readObject());
break;
case QueueReceive:
-
result=server.queueReceive((Queue)in.readObject(),
in.readLong(),spyDistributedConnection);
+
result=server.queueReceive(spyDistributedConnection,(Queue)in.readObject(),
in.readLong());
break;
case ConnectionListening:
-
server.connectionListening(in.readBoolean(),(Destination)in.readObject(),spyDistributedConnection);
+
server.connectionListening(spyDistributedConnection,in.readBoolean(),(Destination)in.readObject());
break;
case Acknowledge:
- SpyAcknowledgementItem items[] = new
SpyAcknowledgementItem[in.readInt()];
- for( int i=0; i < items.length; i++ )
- items[i] =
(SpyAcknowledgementItem)in.readObject();
- server.acknowledge(items,
in.readBoolean(),spyDistributedConnection);
+
server.acknowledge(spyDistributedConnection, (SpyAcknowledgementItem)in.readObject());
break;
case SetSpyDistributedConnection:
spyDistributedConnection =
(SpyDistributedConnection)in.readObject();
@@ -180,6 +182,9 @@
((ConnectionReceiverUILClient)spyDistributedConnection.cr).createConnection();
}
break;
+ case Transact:
+
server.transact(spyDistributedConnection, (Transaction)in.readObject());
+ break;
default:
throw new RemoteException("Bad method
code !");
}
@@ -235,16 +240,7 @@
Log.error("Closing socket: "+st);
Log.error(e);
}
-
- void newMessage(String id,int nb,ObjectInputStream in) throws Exception
- {
- SpyMessage mes[] = new SpyMessage[nb];
- for(int i=0;i<nb;i++)
- mes[i]=(SpyMessage)in.readObject();
- server.newMessage(mes, id);
- }
-
// --
public DistributedJMSServer createClient() throws Exception
{
1.3 +20 -24
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java
Index: DistributedJMSServerRMI.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- DistributedJMSServerRMI.java 2000/11/19 20:00:01 1.2
+++ DistributedJMSServerRMI.java 2000/12/12 05:58:51 1.3
@@ -5,7 +5,6 @@
* See terms of license at gnu.org.
*/
package org.spydermq.distributed.server;
-
import javax.jms.JMSException;
import javax.jms.Destination;
import javax.jms.Topic;
@@ -19,32 +18,29 @@
import org.spydermq.SpyDistributedConnection;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.SpyAcknowledgementItem;
-
/**
- * The RMI interface of the DistributedJMSServer object
+ *The RMI interface of the DistributedJMSServer object
*
- * @author Norbert Lataille ([EMAIL PROTECTED])
+ *@author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ *@version $Revision: 1.3 $
*/
-public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote
-{
-
+public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote {
// Public --------------------------------------------------------
- public String getID() throws JMSException, RemoteException;
- public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException;
- public Topic createTopic(String dest) throws JMSException, RemoteException;
- public Queue createQueue(String dest) throws JMSException, RemoteException;
- public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException, RemoteException;
- public void checkID(String ID) throws JMSException, RemoteException;
- public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection
newSpyDistributedConnection) throws RemoteException;
- public void acknowledge(SpyAcknowledgementItem[] items, boolean
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException;
- public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException;
- public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws Exception, RemoteException;
- public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException;
- public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException;
- public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws Exception, RemoteException;
- public void subscribe(Destination dest, SpyDistributedConnection dc) throws
JMSException, RemoteException;
- public void unsubscribe(Destination dest, SpyDistributedConnection dc) throws
JMSException, RemoteException;
-
+ public String getID() throws RemoteException, Exception;
+ public void checkID(String ID) throws RemoteException, Exception;
+ public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection
newSpyDistributedConnection) throws RemoteException, Exception;
+ public void connectionClosing(SpyDistributedConnection dc) throws
RemoteException, Exception;
+ public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
RemoteException, Exception;
+ public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
RemoteException, Exception;
+ public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws RemoteException, Exception;
+ public void addMessage(SpyDistributedConnection dc, SpyMessage message) throws
RemoteException, Exception;
+ public void connectionListening(SpyDistributedConnection dc, boolean mode,
Destination dest) throws RemoteException, Exception;
+ public Queue createQueue(SpyDistributedConnection dc, String dest) throws
RemoteException, Exception;
+ public Topic createTopic(SpyDistributedConnection dc, String dest) throws
RemoteException, Exception;
+ public void deleteTemporaryDestination(SpyDistributedConnection dc,
SpyDestination dest) throws RemoteException, Exception;
+ public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long
wait) throws RemoteException, Exception;
+ public void subscribe(SpyDistributedConnection dc, Destination dest) throws
RemoteException, Exception;
+ public void transact(SpyDistributedConnection dc, org.spydermq.Transaction t)
throws RemoteException, Exception;
+ public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws
RemoteException, Exception;
}
1.11 +11 -3
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
Index: ConnectionReceiverOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- ConnectionReceiverOIL.java 2000/11/29 17:06:36 1.10
+++ ConnectionReceiverOIL.java 2000/12/12 05:58:51 1.11
@@ -8,18 +8,19 @@
import javax.jms.Destination;
import javax.jms.JMSException;
-import org.spydermq.SpyConnection;
+import org.spydermq.SpyConnection;
import org.spydermq.SpyMessage;
import org.spydermq.SpySession;
-
import org.spydermq.SpyDestination;
import org.spydermq.SpyTopicConnection;
import org.spydermq.SpyQueueSession;
import org.spydermq.Log;
import org.spydermq.NoReceiverException;
+import org.spydermq.SpyMessageConsumer;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
+
import java.util.Hashtable;
import java.util.HashSet;
import java.util.Iterator;
@@ -35,7 +36,14 @@
import java.io.ObjectOutputStream;
import java.io.IOException;
-import org.spydermq.SpyMessageConsumer;
+/**
+ * The OIL implementation of the ConnectionReceiver object
+ *
+ * @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
+ *
+ * @version $Revision: 1.11 $
+ */
public class ConnectionReceiverOIL
implements Runnable, ConnectionReceiverSetup
{
1.4 +144 -199
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java
Index: DistributedJMSServerUILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- DistributedJMSServerUILClient.java 2000/11/29 17:06:36 1.3
+++ DistributedJMSServerUILClient.java 2000/12/12 05:58:52 1.4
@@ -12,11 +12,17 @@
import javax.jms.Queue;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
+
import org.spydermq.SpyMessage;
import org.spydermq.SpyDestination;
-import org.spydermq.JMSServer;
import org.spydermq.Log;
import org.spydermq.SpyDistributedConnection;
+import org.spydermq.SpyAcknowledgementItem;
+import org.spydermq.Transaction;
+import org.spydermq.server.JMSServer;
+import org.spydermq.distributed.interfaces.DistributedJMSServer;
+import org.spydermq.multiplexor.SocketMultiplexor;
+
import java.rmi.RemoteException;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
@@ -26,20 +32,17 @@
import java.io.Serializable;
import java.net.Socket;
import java.net.InetAddress;
-import org.spydermq.distributed.interfaces.DistributedJMSServer;
/**
- * The OIL implementation of the DistributedJMSServer object
- *
- * @author Norbert Lataille ([EMAIL PROTECTED])
- * @author Hiram Chirino ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.3 $
+ *The UIL implementation of the DistributedJMSServer object
+ *
+ *@author Norbert Lataille ([EMAIL PROTECTED])
+ *@author Hiram Chirino ([EMAIL PROTECTED])
+ *
+ *@version $Revision: 1.4 $
*/
-import org.spydermq.multiplexor.SocketMultiplexor;import
org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerUILClient
- implements DistributedJMSServer, Serializable
-{
-
+public class DistributedJMSServerUILClient implements DistributedJMSServer,
Serializable {
+
static final int GetID = 1;
static final int NewMessage = 2;
static final int Subscribe = 3;
@@ -51,61 +54,49 @@
static final int ConnectionClouting = 9;
static final int DeleteTemporaryDestination = 10;
static final int CheckID = 11;
- static final int QueueReceive = 12;
+ static final int QueueReceive = 12;
static final int ConnectionListening = 13;
- static final int Acknowledge = 14;
- static final int SetSpyDistributedConnection = 15;
-
-
+ static final int Acknowledge = 14;
+ static final int SetSpyDistributedConnection = 15;
+ static final int Transact = 16;
+
//Remote stuff
- private int port;
- private InetAddress addr;
-
- private transient Socket socket;
- transient SocketMultiplexor mSocket;
-
- private transient ObjectOutputStream out;
- private transient ObjectInputStream in;
-
-
- public DistributedJMSServerUILClient(InetAddress addr,int port)
- {
- socket=null;
- this.port=port;
- this.addr=addr;
- }
-
- void createConnection() throws RemoteException
- {
- try {
-
- System.out.println("ClientUIL initializing");
- socket=new Socket(addr,port);
- mSocket = new SocketMultiplexor( socket );
+ protected int port;
+ protected InetAddress addr;
+ protected transient SocketMultiplexor mSocket;
+ protected transient ObjectOutputStream out;
+ protected transient ObjectInputStream in;
- out=new ObjectOutputStream(new
BufferedOutputStream(mSocket.getOutputStream(1)));
+ public DistributedJMSServerUILClient(InetAddress addr, int port) {
+ mSocket = null;
+ this.port = port;
+ this.addr = addr;
+ }
+
+ protected void createConnection() throws RemoteException {
+ try {
+ Socket socket = new Socket(addr, port);
+ mSocket = new SocketMultiplexor(socket);
+ out = new ObjectOutputStream(new
BufferedOutputStream(mSocket.getOutputStream(1)));
out.flush();
-
- in=new ObjectInputStream(new
BufferedInputStream(mSocket.getInputStream(1)));
- System.out.println("ClientUIL initialized");
-
+ in = new ObjectInputStream(new
BufferedInputStream(mSocket.getInputStream(1)));
} catch (Exception e) {
failure(e);
}
}
-
- public Object waitAnswer() throws RemoteException
- {
+
+ public Object waitAnswer() throws RemoteException {
try {
out.reset();
out.flush();
- int val=in.readByte();
- if (val==0) return null;
- if (val==1) {
+ int val = in.readByte();
+ if (val == 0)
+ return null;
+ if (val == 1) {
return in.readObject();
} else {
- Exception e=(Exception)in.readObject();
- throw new RemoteException("",e);
+ Exception e = (Exception) in.readObject();
+ throw new RemoteException("", e);
}
} catch (RemoteException e) {
Log.log(e);
@@ -114,157 +105,104 @@
failure(e);
return null;
}
-
- }
-
- void failure(Exception e) throws RemoteException
- {
+ }
+
+ void failure(Exception e) throws RemoteException {
Log.error(e);
throw new RemoteException("Cannot contact the remote object");
- }
-
- //--- Remote Calls
-
- public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException
- {
- if (socket==null) createConnection();
-
- try {
- out.writeByte(NewMessage);
- out.writeObject(id);
- out.writeInt(val.length);
- for(int i=0;i<val.length;i++)
- out.writeObject(val[i]);
- } catch (IOException e) {
- failure(e);
- }
-
- waitAnswer();
}
- public String getID() throws Exception
- {
- if (socket==null) createConnection();
-
+ public String getID() throws Exception {
+ checkConnection();
try {
out.writeByte(GetID);
} catch (IOException e) {
failure(e);
}
-
- return (String)waitAnswer();
+ return (String) waitAnswer();
}
-
-
-
-
- public Topic createTopic(String dest) throws JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+ public void checkID(String ID) throws JMSException, RemoteException {
+ checkConnection();
try {
- out.writeByte(CreateTopic);
- out.writeObject(dest);
+ out.writeByte(CheckID);
+ out.writeObject(ID);
} catch (IOException e) {
failure(e);
}
-
- return (Topic)waitAnswer();
+ waitAnswer();
}
-
- public Queue createQueue(String dest) throws JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+
+ public void setSpyDistributedConnection(SpyDistributedConnection dest) throws
RemoteException {
+ checkConnection();
try {
- out.writeByte(CreateQueue);
+ out.writeByte(SetSpyDistributedConnection);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
-
- return (Queue)waitAnswer();
+ waitAnswer();
}
-
-
-
-
-
-
- public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+ public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException {
+ checkConnection();
try {
- out.writeByte(DeleteTemporaryDestination);
- out.writeObject(dest);
+ out.writeByte(ConnectionClouting);
} catch (IOException e) {
failure(e);
}
-
waitAnswer();
}
-
- public void checkID(String ID) throws JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+
+ public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException {
+ checkConnection();
try {
- out.writeByte(CheckID);
- out.writeObject(ID);
+ out.writeByte(GetTemporaryQueue);
} catch (IOException e) {
failure(e);
}
-
- waitAnswer();
+ return (TemporaryQueue) waitAnswer();
}
-
- public void setSpyDistributedConnection(SpyDistributedConnection dest) throws
RemoteException {
- if (socket==null) createConnection();
-
+
+ public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException {
+ checkConnection();
try {
- out.writeByte( SetSpyDistributedConnection );
- out.writeObject(dest);
+ out.writeByte(GetTemporaryTopic);
} catch (IOException e) {
failure(e);
- }
- waitAnswer();
- }
-
- public void acknowledge(SpyAcknowledgementItem item[],boolean
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException {
- if (socket==null) createConnection();
-
+ }
+ return (TemporaryTopic) waitAnswer();
+ }
+
+ public void acknowledge(SpyDistributedConnection dc, SpyAcknowledgementItem
item) throws JMSException, RemoteException {
+ checkConnection();
try {
out.writeByte(Acknowledge);
- out.writeInt(item.length);
- for( int i=0; i< item.length; i++ )
- out.writeObject(item[i]);
- out.writeBoolean(isAck);
+ out.writeObject(item);
} catch (IOException e) {
failure(e);
}
-
waitAnswer();
- }
-
- public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+ }
+
+ public void addMessage(SpyDistributedConnection dc, SpyMessage val) throws
JMSException, RemoteException {
+ checkConnection();
try {
- out.writeByte(ConnectionClouting);
+ out.writeByte(NewMessage);
+ out.writeObject(val);
} catch (IOException e) {
failure(e);
}
waitAnswer();
- }
-
- public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws Exception, RemoteException
- {
- if (socket==null) createConnection();
-
+ }
+
+ protected void checkConnection() throws RemoteException {
+ if (mSocket == null)
+ createConnection();
+ }
+
+ public void connectionListening(SpyDistributedConnection dc, boolean mode,
Destination dest) throws Exception, RemoteException {
+ checkConnection();
try {
out.writeByte(ConnectionListening);
out.writeBoolean(mode);
@@ -272,77 +210,84 @@
} catch (IOException e) {
failure(e);
}
-
waitAnswer();
- }
-
- public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+ }
+
+ public Queue createQueue(SpyDistributedConnection dc, String dest) throws
JMSException, RemoteException {
+ checkConnection();
try {
- out.writeByte(GetTemporaryQueue);
+ out.writeByte(CreateQueue);
+ out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
-
- return (TemporaryQueue)waitAnswer();
- }
-
- public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+ return (Queue) waitAnswer();
+ }
+
+ public Topic createTopic(SpyDistributedConnection dc, String dest) throws
JMSException, RemoteException {
+ checkConnection();
try {
- out.writeByte(GetTemporaryTopic);
+ out.writeByte(CreateTopic);
+ out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
-
- return (TemporaryTopic)waitAnswer();
- }
-
- public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws Exception, RemoteException
- {
- if (socket==null) createConnection();
-
+ return (Topic) waitAnswer();
+ }
+
+ public void deleteTemporaryDestination(SpyDistributedConnection dc,
SpyDestination dest) throws JMSException, RemoteException {
+ checkConnection();
try {
+ out.writeByte(DeleteTemporaryDestination);
+ out.writeObject(dest);
+ } catch (IOException e) {
+ failure(e);
+ }
+ waitAnswer();
+ }
+
+ public SpyMessage queueReceive(SpyDistributedConnection dc, Queue queue, long
wait) throws Exception, RemoteException {
+ checkConnection();
+ try {
out.writeByte(QueueReceive);
out.writeObject(queue);
out.writeLong(wait);
} catch (IOException e) {
failure(e);
}
-
- return (SpyMessage)waitAnswer();
- }
-
- public void subscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+ return (SpyMessage) waitAnswer();
+ }
+
+ public void subscribe(SpyDistributedConnection dc, Destination dest) throws
JMSException, RemoteException {
+ checkConnection();
try {
out.writeByte(Subscribe);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
+ }
+ waitAnswer();
+ }
+
+ public void transact(org.spydermq.SpyDistributedConnection dc, Transaction t)
throws JMSException, RemoteException {
+ checkConnection();
+ try {
+ out.writeByte(Transact);
+ out.writeObject(t);
+ } catch (IOException e) {
+ failure(e);
}
-
waitAnswer();
- }
-
- public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
- if (socket==null) createConnection();
-
+ }
+
+ public void unsubscribe(SpyDistributedConnection dc, Destination dest) throws
JMSException, RemoteException {
+ checkConnection();
try {
out.writeByte(Unsubscribe);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
-
waitAnswer();
}
-
}
1.7 +35 -85
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
Index: DistributedJMSServerOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- DistributedJMSServerOIL.java 2000/11/29 17:06:36 1.6
+++ DistributedJMSServerOIL.java 2000/12/12 05:58:52 1.7
@@ -1,3 +1,9 @@
+/*
+ * spyderMQ, the OpenSource JMS implementation
+ *
+ * Distributable under GPL license.
+ * See terms of license at gnu.org.
+ */
package org.spydermq.distributed.server;
import javax.jms.JMSException;
@@ -6,14 +12,16 @@
import javax.jms.Queue;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
+
import org.spydermq.SpyMessage;
-import org.spydermq.JMSServerQueue;
import org.spydermq.SpyDestination;
-import org.spydermq.JMSServer;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.Log;
+import org.spydermq.SpyAcknowledgementItem;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
+import org.spydermq.Transaction;
+
import java.rmi.RemoteException;
import java.net.ServerSocket;
import java.net.Socket;
@@ -23,54 +31,23 @@
import java.io.ObjectInputStream;
import java.io.BufferedInputStream;
import java.io.IOException;
-import org.spydermq.SpyAcknowledgementItem;
-public class DistributedJMSServerOIL
- implements Runnable, DistributedJMSServerSetup, DistributedJMSServerOILMBean
+/**
+ * The OIL implementation of the DistributedJMSServer object
+ *
+ * @author Norbert Lataille ([EMAIL PROTECTED])
+ * @author Hiram Chirino ([EMAIL PROTECTED])
+ *
+ * @version $Revision: 1.7 $
+ */
+public class DistributedJMSServerOIL extends DistributedJMSServerUIL
+ implements DistributedJMSServerOILMBean
{
-
- // Attributes ----------------------------------------------------
-
- //The server implementation
- private static JMSServer server;
-
- // Constructor ---------------------------------------------------
- public DistributedJMSServerOIL()
- {
- exportObject();
+ public DistributedJMSServerOIL() {
+ // Default constructor
}
- // Internals -----------------------------------------------------
-
- static final int GetID = 1;
- static final int NewMessage = 2;
- static final int Subscribe = 3;
- static final int Unsubscribe = 4;
- static final int CreateTopic = 5;
- static final int CreateQueue = 6;
- static final int GetTemporaryTopic = 7;
- static final int GetTemporaryQueue = 8;
- static final int ConnectionClosing = 9;
- static final int DeleteTemporaryDestination = 10;
- static final int CheckID = 11;
- static final int QueueReceive = 12;
- static final int ConnectionListening = 13;
- static final int Acknowledge = 14;
- static final int SetSpyDistributedConnection = 15;
-
- private ServerSocket serverSocket;
-
- void exportObject()
- {
- try {
- serverSocket = new ServerSocket(0);
- new Thread(this).start();
- } catch (IOException e) {
- failure("Initialization",e);
- }
- }
-
public void run()
{
Socket socket = null;
@@ -93,14 +70,12 @@
failure("Initialisation",e);
return;
}
-
+
while (!closed) {
try {
code=in.readByte();
} catch (IOException e) {
- if( closed )
- break;
Log.notice("Command read");
Log.notice(e);
break;
@@ -116,13 +91,13 @@
result=server.getID();
break;
case NewMessage:
-
newMessage((String)in.readObject(),in.readInt(),in);
+
server.addMessage(spyDistributedConnection, (SpyMessage)in.readObject());
break;
case Subscribe:
-
server.subscribe((Destination)in.readObject(),spyDistributedConnection);
+
server.subscribe(spyDistributedConnection, (Destination)in.readObject());
break;
case Unsubscribe:
-
server.unsubscribe((Destination)in.readObject(),spyDistributedConnection);
+
server.unsubscribe(spyDistributedConnection,(Destination)in.readObject());
break;
case CreateTopic:
result=(Topic)server.createTopic((String)in.readObject());
@@ -138,7 +113,7 @@
break;
case ConnectionClosing:
server.connectionClosing(spyDistributedConnection,null);
- closed=true;
+ closed = true;
break;
case DeleteTemporaryDestination:
server.deleteTemporaryDestination((SpyDestination)in.readObject());
@@ -147,20 +122,20 @@
server.checkID((String)in.readObject());
break;
case QueueReceive:
-
result=server.queueReceive((Queue)in.readObject(),
in.readLong(),spyDistributedConnection);
+
result=server.queueReceive(spyDistributedConnection,(Queue)in.readObject(),
in.readLong());
break;
case ConnectionListening:
-
server.connectionListening(in.readBoolean(),(Destination)in.readObject(),spyDistributedConnection);
+
server.connectionListening(spyDistributedConnection,in.readBoolean(),(Destination)in.readObject());
break;
case Acknowledge:
- SpyAcknowledgementItem items[] = new
SpyAcknowledgementItem[in.readInt()];
- for( int i=0; i < items.length; i++ )
- items[i] =
(SpyAcknowledgementItem)in.readObject();
- server.acknowledge(items,
in.readBoolean(),spyDistributedConnection);
+
server.acknowledge(spyDistributedConnection, (SpyAcknowledgementItem)in.readObject());
break;
case SetSpyDistributedConnection:
spyDistributedConnection =
(SpyDistributedConnection)in.readObject();
break;
+ case Transact:
+
server.transact(spyDistributedConnection, (Transaction)in.readObject());
+ break;
default:
throw new RemoteException("Bad method
code !");
}
@@ -189,16 +164,14 @@
try {
out.writeByte(2);
- out.writeObject(e.getMessage());
+ out.writeObject(e);
out.reset();
out.flush();
} catch (IOException e2) {
failure("Result write",e2);
break;
- }
-
+ }
}
-
}
try {
@@ -210,33 +183,10 @@
Log.log(e);
}
}
-
- void failure(String st,Exception e)
- {
- Log.error("Closing socket: "+st);
- Log.error(e);
- }
-
- void newMessage(String id,int nb,ObjectInputStream in) throws Exception
- {
- SpyMessage mes[] = new SpyMessage[nb];
- for(int i=0;i<nb;i++)
- mes[i]=(SpyMessage)in.readObject();
- server.newMessage(mes, id);
- }
-
- // --
-
public DistributedJMSServer createClient() throws Exception
{
return new
DistributedJMSServerOILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
}
- public void setServer(JMSServer s)
- {
- server=s;
- }
-
-
}
1.4 +5 -3
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMI.java
Index: ConnectionReceiverRMI.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMI.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- ConnectionReceiverRMI.java 2000/06/19 21:52:00 1.3
+++ ConnectionReceiverRMI.java 2000/12/12 05:58:52 1.4
@@ -8,18 +8,20 @@
import javax.jms.Destination;
import javax.jms.JMSException;
+
import org.spydermq.SpyMessage;
import org.spydermq.SpyDestination;
+import org.spydermq.distributed.interfaces.ConnectionReceiver;
+
import java.rmi.Remote;
import java.rmi.RemoteException;
-import org.spydermq.distributed.interfaces.ConnectionReceiver;
/**
* The RMI interface of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public interface ConnectionReceiverRMI
extends ConnectionReceiver, Remote
@@ -27,7 +29,7 @@
// Public --------------------------------------------------------
//A message has arrived for the Connection
- public void receive(SpyDestination b,SpyMessage c) throws RemoteException,
JMSException;
+ public void receive(SpyDestination b,SpyMessage c) throws RemoteException,
JMSException;
//Messages have arrived for the Connection
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
RemoteException, JMSException;
//One TemporaryDestination has been deleted
1.1
spyderMQ/src/java/org/spydermq/distributed/server/DistributedConnectionFactoryRMIImplMBean.java
Index: DistributedConnectionFactoryRMIImplMBean.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.distributed.server;
public interface DistributedConnectionFactoryRMIImplMBean
{
}
1.1
spyderMQ/src/java/org/spydermq/distributed/server/DistributedConnectionFactoryRMI.java
Index: DistributedConnectionFactoryRMI.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.distributed.server;
import javax.jms.QueueConnection;
import javax.jms.TopicConnection;
import java.rmi.Remote;
import java.rmi.RemoteException;
import org.spydermq.distributed.interfaces.DistributedConnectionFactory;
/**
* The RMI interface of the DistributedConnectionFactory object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public interface DistributedConnectionFactoryRMI extends
DistributedConnectionFactory, Remote
{
// Public --------------------------------------------------------
public QueueConnection createQueueConnection() throws Exception;
public QueueConnection createQueueConnection(String userName, String password)
throws Exception;
public TopicConnection createTopicConnection() throws Exception;
public TopicConnection createTopicConnection(String userName, String password)
throws Exception;
}
1.1
spyderMQ/src/java/org/spydermq/distributed/server/DistributedConnectionFactoryRMIImpl.java
Index: DistributedConnectionFactoryRMIImpl.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.distributed.server;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.JMSException;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.SpyQueueConnection;
import org.spydermq.security.SecurityManager;
import java.util.Hashtable;
import javax.jms.TopicConnectionFactory;
import org.spydermq.SpyTopicConnection;
import org.spydermq.server.JMSServer;
import javax.jms.TopicConnection;
/**
* The RMI implementation of the DistributedConnectionFactory object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class DistributedConnectionFactoryRMIImpl
extends UnicastRemoteObject
implements DistributedConnectionFactoryRMI,
DistributedConnectionFactoryRMIImplMBean
{
// Attributes ----------------------------------------------------
protected DistributedJMSServer server;
private String crCN;
protected SecurityManager securityManager;
// Constructor ---------------------------------------------------
public DistributedConnectionFactoryRMIImpl() throws RemoteException
{
super();
}
// Public --------------------------------------------------------
public void setServer(DistributedJMSServer theServer)
{
server=theServer;
}
public void setCRClassName(String className)
{
crCN=className;
}
public void setSecurityManager(SecurityManager securityManager)
{
this.securityManager=securityManager;
}
public QueueConnection createQueueConnection() throws JMSException
{
SpyQueueConnection obj=new SpyQueueConnection(server,null,crCN);
return obj;
}
public QueueConnection createQueueConnection(String userName, String password)
throws JMSException
{
String id=securityManager.checkUser(userName,password);
return new SpyQueueConnection(server,id,crCN);
}
public TopicConnection createTopicConnection() throws JMSException
{
SpyTopicConnection obj=new SpyTopicConnection(server,null,crCN);
return obj;
}
public TopicConnection createTopicConnection(String userName, String password)
throws JMSException
{
String id=securityManager.checkUser(userName,password);
SpyTopicConnection obj=new SpyTopicConnection(server,id,crCN);
return obj;
}
public void setConnectionReceiverClassName(String className)
{
crCN=className;
}
}