User: hiram
Date: 00/11/19 12:00:04
Modified: src/java/org/spydermq/distributed/server
ConnectionReceiverOIL.java
ConnectionReceiverOILClient.java
ConnectionReceiverRMIImpl.java
DistributedJMSServerOIL.java
DistributedJMSServerOILClient.java
DistributedJMSServerRMI.java
DistributedJMSServerRMIImpl.java
ConnectionReceiverUIL.java
ConnectionReceiverUILClient.java
DistributedJMSServerUIL.java
DistributedJMSServerUILClient.java
DistributedJMSServerUILMBean.java
Log:
Commiting several changes:
- Removed ConnectionQueue and SessionQueue. All consumer managment is done at the
SpyConnection now.
- Unacknowledged messages are maintained on the server side now.
(JMSServerQueueReceiver)
- Acknowlegment messages are sent to the server from the client.
- Optimized the OIL and UIL transports by caching the DistributedConnection on the
JMSServer when the ConnectionReciver is setup.
- Cleaned up the OIL by only using the Object(Output/Input) streams instead of both
Object(Output/Input) and Buffered(Output/Input) streams.
- A QueueReceiver now does a request for a single message on a receive() method
instead turning on/off listen to get a message.
- For the OIL and UIL, a connection failure/termination is now handled gracefully
(if connectionCLosing() was called, no errors shown, if it was not called and the
connection failed, we call it).
Revision Changes Path
1.8 +67 -134
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java
Index: ConnectionReceiverOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOIL.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- ConnectionReceiverOIL.java 2000/11/14 06:03:19 1.7
+++ ConnectionReceiverOIL.java 2000/11/19 20:00:01 1.8
@@ -9,10 +9,10 @@
import javax.jms.Destination;
import javax.jms.JMSException;
import org.spydermq.SpyConnection;
-import org.spydermq.ConnectionQueue;
+
import org.spydermq.SpyMessage;
import org.spydermq.SpySession;
-import org.spydermq.SessionQueue;
+
import org.spydermq.SpyDestination;
import org.spydermq.SpyTopicConnection;
import org.spydermq.SpyQueueSession;
@@ -35,13 +35,7 @@
import java.io.ObjectOutputStream;
import java.io.IOException;
-/**
- * The OIL implementation of the ConnectionReceiver object
- *
- * @author Norbert Lataille ([EMAIL PROTECTED])
- *
- * @version $Revision: 1.7 $
- */
+import org.spydermq.SpyMessageConsumer;
public class ConnectionReceiverOIL
implements Runnable, ConnectionReceiverSetup
{
@@ -83,27 +77,15 @@
{
Socket socket = null;
int code = 0;
- //InputStream is=null;
- //OutputStream os=null;
- BufferedInputStream is=null;
- BufferedOutputStream os=null;
ObjectOutputStream out=null;
ObjectInputStream in=null;
try {
+
socket = serverSocket.accept();
-
- //We have our connection to the broker... there's no need to
wait for another connection
- //new Thread(this).start();
-
- //is = socket.getInputStream();
- //os = socket.getOutputStream();
- is = new BufferedInputStream(socket.getInputStream());
- os = new BufferedOutputStream(socket.getOutputStream());
-
- out = new ObjectOutputStream(os);
+ out = new ObjectOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
out.flush();
- in = new ObjectInputStream(is);
+ in = new ObjectInputStream(new
BufferedInputStream(socket.getInputStream()));
} catch (IOException e) {
failure("Initialisation",e);
@@ -113,7 +95,7 @@
while (true) {
try {
- code=is.read();
+ code=in.readByte();
} catch (IOException e) {
failure("Command read",e);
e.printStackTrace();
@@ -146,24 +128,24 @@
//Everthing was OK
try {
- os.write(0);
- os.flush();
+ out.writeByte(0);
+ out.flush();
} catch (IOException e) {
failure("Result write",e);
return;
}
- } catch (Exception e) {
+ } catch (Exception e) {
try {
if( e instanceof NoReceiverException ) {
- os.write(2);
+ out.writeByte(2);
} else {
- os.write(1);
+ out.writeByte(1);
}
out.writeObject(e.getMessage());
+ out.flush();
out.flush();
- os.flush();
} catch (IOException e2) {
failure("Result write",e2);
return;
@@ -193,140 +175,91 @@
}
// ---
-
+
//<DEBUG>
-
+
/*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
{
connection.rec++;
}
-
+
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
{
connection.rec++;
}*/
-
+
//</DEBUG>
-
-
+
//A message has arrived for this Connection, We have to dispatch it to the
sessions
- public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
-
- Log.log("ConnectionReceiver:
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
-
+ public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+
+ Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() +
",Mes=" + mes.toString() + ")");
+
if (connection instanceof SpyTopicConnection) {
-
+
//Get the set of subscribers for this Topic
-
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) return;
-
- Iterator i=connectionQueue.subscribers.iterator();
-
- while (i.hasNext()) {
-
- SpySession session=(SpySession)i.next();
-
- //add the new message to the session's queue
- session.dispatchMessage(dest,mes);
-
+ SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+
+ for (int i = 0; i < consumers.length; i++) {
+
+ //add the new message to the consumer's queue
+ consumers[i].addMessage(mes);
+
//There is work to do...
- session.mutex.notifyLock();
+ consumers[i].session.mutex.notifyLock();
}
+
} else {
-
- while (true) {
-
- SessionQueue sq=null;
- try {
-
- //Find one session waiting for this Queue
- if (connection.modeStop) throw new
Exception("This connection is stopped !");
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) throw new
Exception("There is no connectionQueue for this destination !");
-
- synchronized (connectionQueue) {
-
- //Find a SessionQueue
- if
(connectionQueue.NumListeningSessions==0) throw new NoReceiverException("There are no
listening sessions for this destination !");
-
- Iterator
i=connectionQueue.subscribers.iterator();
- while (i.hasNext()) {
- SpySession
session=(SpySession)i.next();
-
sq=(SessionQueue)session.destinations.get(dest);
- if
(sq.NumListeningSubscribers!=0) break;
- }
- if
(sq==null||sq.NumListeningSubscribers==0) {
- Log.error("FIXME: The
listeners count was invalid !");
- throw new
NoReceiverException("There are no listening sessions for this destination !");
- }
-
- //Try with this sessionQueue
- Log.log("Dispatching to SessionQueue:
"+mes);
- sq.dispatchMessage(dest,mes);
-
- //Our work is done here
- break;
- }
+ //Find one session waiting for this Queue
+ if (connection.modeStop)
+ throw new JMSException("This connection is stopped !");
+
+ SpyMessageConsumer consumer =
connection.pickListeningConsumer(dest);
+ if (consumer == null)
+ throw new NoReceiverException("There are no listening
sessions for this destination !");
+
+ //Try with this sessionQueue
+ Log.log("Dispatching to SessionQueue: " + mes);
+ ((org.spydermq.SpyQueueReceiver)consumer).dispatchMessage( mes
);
- } catch (NoReceiverException e) {
- //This SessionQueue should not have been
registered !
- throw e;
- } catch (Exception e) {
- //This error is non-recoverable : we must
unregister from this queue
- //Let the JMSServerQueue do its work
- Log.log(e);
- throw new JMSException("There are no listening
sessions in this connection");
- }
- }
-
}
-
- }
- public void receiveMultiple(SpyDestination dest,int nb,ObjectInputStream in)
throws Exception
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
-
+ }
+
+
+ public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in)
throws Exception {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+
Log.log("ConnectionReceiver: ReceiveMultiple()");
-
+
if (connection instanceof SpyTopicConnection) {
-
+
//Get the set of subscribers for this Topic
-
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) return;
-
- for(int val=0;val<nb;val++) {
-
- SpyMessage mes=(SpyMessage)in.readObject();
-
- //NL: i is a short-lived object. Try to "group"
messages in an pre-allocated/peristant
- //array and apply the same iterator on this array
- Iterator i=connectionQueue.subscribers.iterator();
-
- while (i.hasNext()) {
-
- SpySession session=(SpySession)i.next();
-
- //add the new message to the session's queue
- session.dispatchMessage(dest,mes);
-
+ SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+
+ for (int val = 0; val < nb; val++) {
+ SpyMessage mes = (SpyMessage) in.readObject();
+
+ for (int i = 0; i < consumers.length; i++) {
+
+ //add the new message to the consumer's queue
+ consumers[i].addMessage(mes);
+
//There is work to do...
- session.mutex.notifyLock();
+ consumers[i].session.mutex.notifyLock();
}
}
-
} else {
- throw new Exception("Multiple dispatch for a Queue");
+ throw new Exception("Multiple dispatch for a Queue");
}
}
- public void close() throws Exception
+ public void close() throws Exception
{
closed=true;
}
1.10 +20 -22
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java
Index: ConnectionReceiverOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverOILClient.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- ConnectionReceiverOILClient.java 2000/11/14 06:05:05 1.9
+++ ConnectionReceiverOILClient.java 2000/11/19 20:00:01 1.10
@@ -27,8 +27,8 @@
static final int CLOSE = 4;
private transient Socket socket;
- private transient BufferedInputStream is;
- private transient BufferedOutputStream os;
+
+
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
@@ -46,11 +46,9 @@
{
try {
socket=new Socket(addr,port);
- is = new BufferedInputStream(socket.getInputStream());
- os = new BufferedOutputStream(socket.getOutputStream());
- in=new ObjectInputStream(is);
- out=new ObjectOutputStream(os);
- os.flush();
+ out=new ObjectOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
+ out.flush();
+ in=new ObjectInputStream(new
BufferedInputStream(socket.getInputStream()));
} catch (Exception e) {
Log.error(e);
throw new RemoteException("Cannot connect to the
ConnectionReceiver/Server");
@@ -61,8 +59,8 @@
{
Exception throwException=null;
try {
- os.flush();
- int val=is.read();
+ out.flush();
+ int val=in.readByte();
switch(val) {
case 1:
String st=(String)in.readObject();
@@ -81,20 +79,20 @@
if( throwException != null )
throw throwException;
}
-
- public void receive(SpyDestination dest,SpyMessage mes) throws Exception
+
+ public void receive(SpyDestination dest,SpyMessage mes) throws Exception
{
- if (socket==null) createConnection();
- os.write(RECEIVE);
- out.writeObject(dest);
- out.writeObject(mes);
- waitAnswer();
+ if (socket==null) createConnection();
+ out.writeByte(RECEIVE);
+ out.writeObject(dest);
+ out.writeObject(mes);
+ waitAnswer();
}
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
Exception
+ public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
Exception
{
if (socket==null) createConnection();
- os.write(RECEIVE_MULTIPLE);
+ out.writeByte(RECEIVE_MULTIPLE);
out.writeObject(dest);
out.writeInt(mes.length);
for(int i=0;i<mes.length;i++)
@@ -102,18 +100,18 @@
waitAnswer();
}
- public void deleteTemporaryDestination(SpyDestination dest) throws Exception
+ public void deleteTemporaryDestination(SpyDestination dest) throws Exception
{
if (socket==null) createConnection();
- os.write(DELETE_TEMPORARY_DESTINATION);
+ out.writeByte(DELETE_TEMPORARY_DESTINATION);
out.writeObject(dest);
waitAnswer();
}
-
+
public void close() throws Exception
{
if (socket==null) createConnection();
- os.write(CLOSE);
+ out.writeByte(CLOSE);
waitAnswer();
}
1.12 +57 -90
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java
Index: ConnectionReceiverRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverRMIImpl.java,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -r1.11 -r1.12
--- ConnectionReceiverRMIImpl.java 2000/06/19 21:52:00 1.11
+++ ConnectionReceiverRMIImpl.java 2000/11/19 20:00:01 1.12
@@ -9,10 +9,10 @@
import javax.jms.Destination;
import javax.jms.JMSException;
import org.spydermq.SpyConnection;
-import org.spydermq.ConnectionQueue;
+
import org.spydermq.SpyMessage;
import org.spydermq.SpySession;
-import org.spydermq.SessionQueue;
+
import org.spydermq.SpyDestination;
import org.spydermq.SpyTopicConnection;
import org.spydermq.SpyQueueSession;
@@ -26,12 +26,12 @@
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
-/**
+import org.spydermq.SpyMessageConsumer;/**
* The RMI implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.11 $
+ * @version $Revision: 1.12 $
*/
public class ConnectionReceiverRMIImpl
extends UnicastRemoteObject
@@ -58,107 +58,74 @@
{
this.connection=connection;
}
-
- //<DEBUG>
- /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
- {
- connection.rec++;
- }
+ //A message has arrived for this Connection, We have to dispatch it to the
sessions
+ public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
- {
- connection.rec++;
- }*/
-
- //</DEBUG>
+ Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() +
",Mes=" + mes.toString() + ")");
-
- //A message has arrived for this Connection, We have to dispatch it to the
sessions
- public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
-
- Log.log("ConnectionReceiver:
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
-
if (connection instanceof SpyTopicConnection) {
-
+
//Get the set of subscribers for this Topic
-
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) return;
-
- Iterator i=connectionQueue.subscribers.iterator();
-
- while (i.hasNext()) {
-
- SpySession session=(SpySession)i.next();
-
- //add the new message to the session's queue
- session.dispatchMessage(dest,mes);
-
+ SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+
+ for (int i = 0; i < consumers.length; i++) {
+
+ //add the new message to the consumer's queue
+ consumers[i].addMessage(mes);
+
//There is work to do...
- session.mutex.notifyLock();
+ consumers[i].session.mutex.notifyLock();
}
-
+
} else {
-
- while (true) {
-
- SessionQueue sq=null;
-
- try {
-
- //Find one session waiting for this Queue
- if (connection.modeStop) throw new
Exception("This connection is stopped !");
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) throw new
Exception("There is no connectionQueue for this destination !");
-
- synchronized (connectionQueue) {
-
- //Find a SessionQueue
- if
(connectionQueue.NumListeningSessions==0) throw new Exception("There are no listening
sessions for this destination !");
-
- Iterator
i=connectionQueue.subscribers.iterator();
- while (i.hasNext()) {
- SpySession
session=(SpySession)i.next();
-
sq=(SessionQueue)session.destinations.get(dest);
- if
(sq.NumListeningSubscribers!=0) break;
- }
- if
(sq==null||sq.NumListeningSubscribers==0) {
- Log.error("FIXME: The
listeners count was invalid !");
- throw new Exception("There are
no listening sessions for this destination !");
- }
-
- //Try with this sessionQueue
- sq.dispatchMessage(dest,mes);
-
- //Our work is done here
- break;
- }
-
- } catch (NoReceiverException e) {
- //This SessionQueue should not have been
registered !
- continue;
- } catch (Exception e) {
- //This error is non-recoverable : we must
unregister from this queue
- //Let the JMSServerQueue do its work
- Log.log(e);
- throw new NoReceiverException("There are no
listening sessions in this connection");
- }
- }
-
+
+ //Find one session waiting for this Queue
+ if (connection.modeStop)
+ throw new JMSException("This connection is stopped !");
+
+ SpyMessageConsumer consumer =
connection.pickListeningConsumer(dest);
+ if (consumer == null)
+ throw new NoReceiverException("There are no listening
sessions for this destination !");
+
+ //Try with this sessionQueue
+ Log.log("Dispatching to SessionQueue: " + mes);
+ ((org.spydermq.SpyQueueReceiver)
consumer).dispatchMessage(mes);
+
}
}
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
{
- for(int i=0;i<mes.length;i++) {
- receive(dest,mes[i]);
- }
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+
+ Log.log("ConnectionReceiver: ReceiveMultiple()");
+
+ if (connection instanceof SpyTopicConnection) {
+
+ //Get the set of subscribers for this Topic
+ SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+
+ for(int i=0;i<mes.length;i++) {
+
+ for (int j = 0; j < consumers.length; j++) {
+
+ //add the new message to the consumer's queue
+ consumers[j].addMessage(mes[i]);
+
+ //There is work to do...
+ consumers[j].session.mutex.notifyLock();
+ }
+ }
+ } else {
+ throw new JMSException("Multiple dispatch for a Queue");
+ }
}
- public void close() throws Exception
+ public void close() throws Exception
{
closed=true;
}
1.5 +61 -46
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java
Index: DistributedJMSServerOIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOIL.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- DistributedJMSServerOIL.java 2000/11/04 19:24:47 1.4
+++ DistributedJMSServerOIL.java 2000/11/19 20:00:01 1.5
@@ -24,7 +24,7 @@
import java.io.BufferedInputStream;
import java.io.IOException;
-public class DistributedJMSServerOIL
+import org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerOIL
implements Runnable, DistributedJMSServerSetup, DistributedJMSServerOILMBean
{
@@ -53,8 +53,10 @@
static final int ConnectionClosing = 9;
static final int DeleteTemporaryDestination = 10;
static final int CheckID = 11;
- static final int QueueReceiveNoWait = 12;
+ static final int QueueReceive = 12;
static final int ConnectionListening = 13;
+ static final int Acknowledge = 14;
+ static final int SetSpyDistributedConnection = 15;
private ServerSocket serverSocket;
@@ -72,35 +74,35 @@
{
Socket socket = null;
int code = 0;
- BufferedInputStream is=null;
- BufferedOutputStream os=null;
ObjectOutputStream out=null;
ObjectInputStream in=null;
+ SpyDistributedConnection spyDistributedConnection=null;
+ boolean closed = false;
try {
socket = serverSocket.accept();
new Thread(this).start();
-
- is = new BufferedInputStream(socket.getInputStream());
- os = new BufferedOutputStream(socket.getOutputStream());
- out = new ObjectOutputStream(os);
- os.flush();
- in = new ObjectInputStream(is);
+
+ out = new ObjectOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
+ out.flush();
+ in = new ObjectInputStream(new
BufferedInputStream(socket.getInputStream()));
} catch (IOException e) {
failure("Initialisation",e);
return;
}
- while (true) {
+ while (!closed) {
try {
- code=is.read();
+ code=in.readByte();
} catch (IOException e) {
+ if( closed )
+ break;
Log.notice("Command read");
- Log.notice(e);
- return;
+ Log.notice(e);
+ break;
}
try {
@@ -115,11 +117,11 @@
case NewMessage:
newMessage((String)in.readObject(),in.readInt(),in);
break;
- case Subscribe:
-
server.subscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
+ case Subscribe:
+
server.subscribe((Destination)in.readObject(),spyDistributedConnection);
break;
case Unsubscribe:
-
server.unsubscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
+
server.unsubscribe((Destination)in.readObject(),spyDistributedConnection);
break;
case CreateTopic:
result=(Topic)server.createTopic((String)in.readObject());
@@ -128,13 +130,14 @@
result=(Queue)server.createQueue((String)in.readObject());
break;
case GetTemporaryTopic:
-
result=(TemporaryTopic)server.getTemporaryTopic((SpyDistributedConnection)in.readObject());
+
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
break;
case GetTemporaryQueue:
-
result=(TemporaryQueue)server.getTemporaryQueue((SpyDistributedConnection)in.readObject());
+
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
break;
case ConnectionClosing:
-
server.connectionClosing((SpyDistributedConnection)in.readObject(),null);
+
server.connectionClosing(spyDistributedConnection,null);
+ closed=true;
break;
case DeleteTemporaryDestination:
server.deleteTemporaryDestination((SpyDestination)in.readObject());
@@ -142,11 +145,20 @@
case CheckID:
server.checkID((String)in.readObject());
break;
- case QueueReceiveNoWait:
-
result=server.queueReceiveNoWait((Queue)in.readObject());
+ case QueueReceive:
+
result=server.queueReceive((Queue)in.readObject(),
in.readLong(),spyDistributedConnection);
break;
case ConnectionListening:
-
server.connectionListening(((is.read())==1),(Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
+
server.connectionListening(in.readBoolean(),(Destination)in.readObject(),spyDistributedConnection);
+ break;
+ case Acknowledge:
+ SpyAcknowledgementItem items[] = new
SpyAcknowledgementItem[in.readInt()];
+ for( int i=0; i < items.length; i++ )
+ items[i] =
(SpyAcknowledgementItem)in.readObject();
+ server.acknowledge(items,
in.readBoolean(),spyDistributedConnection);
+ break;
+ case SetSpyDistributedConnection:
+ spyDistributedConnection =
(SpyDistributedConnection)in.readObject();
break;
default:
throw new RemoteException("Bad method
code !");
@@ -155,31 +167,45 @@
//Everthing was OK
try {
- if (result==null) os.write(0);
+ if (result==null)
+ out.writeByte(0);
else {
- os.write(1);
+ out.writeByte(1);
out.writeObject(result);
}
- os.flush();
+ out.flush();
} catch (IOException e) {
+ if( closed )
+ break;
failure("Result write",e);
- return;
+ break;
}
} catch (Exception e) {
+ if( closed )
+ break;
try {
- os.write(2);
+ out.writeByte(2);
out.writeObject(e.getMessage());
- os.flush();
+ out.flush();
} catch (IOException e2) {
failure("Result write",e2);
- return;
+ break;
}
}
}
+
+ try {
+ if( !closed )
+
server.connectionClosing(spyDistributedConnection,null);
+ socket.close();
+ } catch (IOException e ) {
+ Log.log("Could not gracefully close the connection with the
client");
+ Log.log(e);
+ }
}
void failure(String st,Exception e)
@@ -190,23 +216,11 @@
void newMessage(String id,int nb,ObjectInputStream in) throws Exception
{
- Log.notice("INCOMING: "+nb+" messages from "+id);
+ SpyMessage mes[] = new SpyMessage[nb];
+ for(int i=0;i<nb;i++)
+ mes[i]=(SpyMessage)in.readObject();
- SpyDestination dest=null;
- JMSServerQueue queue=null;
-
- for(int i=0;i<nb;i++) {
-
- SpyMessage mes=(SpyMessage)in.readObject();
-
- if (dest==null||!dest.equals(mes.jmsDestination)) {
-
queue=(JMSServerQueue)server.messageQueue.get(mes.jmsDestination);
- if (queue==null) throw new JMSException("This
destination does not exist !"); //hum...
- }
-
- //Add the message to the queue
- queue.addMessage(mes);
- }
+ server.newMessage(mes, id);
}
// --
@@ -221,4 +235,5 @@
server=s;
}
+
}
1.3 +78 -39
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java
Index: DistributedJMSServerOILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerOILClient.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- DistributedJMSServerOILClient.java 2000/06/20 02:19:13 1.2
+++ DistributedJMSServerOILClient.java 2000/11/19 20:00:01 1.3
@@ -22,7 +22,7 @@
import java.net.InetAddress;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
-public class DistributedJMSServerOILClient
+import org.spydermq.SpyAcknowledgementItem;public class
DistributedJMSServerOILClient
implements DistributedJMSServer, Serializable
{
@@ -37,14 +37,16 @@
static final int ConnectionClosing = 9;
static final int DeleteTemporaryDestination = 10;
static final int CheckID = 11;
- static final int QueueReceiveNoWait = 12;
+ static final int QueueReceive = 12;
static final int ConnectionListening = 13;
+ static final int Acknowledge = 14;
+ static final int SetSpyDistributedConnection = 15;
//Remote stuff
private transient Socket socket;
- private transient BufferedInputStream is;
- private transient BufferedOutputStream os;
+
+
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
@@ -62,11 +64,9 @@
{
try {
socket=new Socket(addr,port);
- is=new BufferedInputStream(socket.getInputStream());
- os=new BufferedOutputStream(socket.getOutputStream());
- in=new ObjectInputStream(is);
- out=new ObjectOutputStream(os);
- os.flush();
+ in=new ObjectInputStream(new
BufferedInputStream(socket.getInputStream()));
+ out=new ObjectOutputStream(new
BufferedOutputStream(socket.getOutputStream()));
+ out.flush();
} catch (Exception e) {
failure(e);
}
@@ -75,8 +75,8 @@
public Object waitAnswer() throws RemoteException
{
try {
- os.flush();
- int val=is.read();
+ out.flush();
+ int val=in.readByte();
if (val==0) return null;
if (val==1) {
return in.readObject();
@@ -99,12 +99,12 @@
//--- Remote Calls
- public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException
+ public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException
{
if (socket==null) createConnection();
try {
- os.write(NewMessage);
+ out.writeByte(NewMessage);
out.writeObject(id);
out.writeInt(val.length);
for(int i=0;i<val.length;i++)
@@ -121,7 +121,7 @@
if (socket==null) createConnection();
try {
- os.write(GetID);
+ out.writeByte(GetID);
} catch (IOException e) {
failure(e);
}
@@ -129,14 +129,13 @@
return (String)waitAnswer();
}
- public void subscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException
+ public void subscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- os.write(Subscribe);
+ out.writeByte(Subscribe);
out.writeObject(dest);
- out.writeObject(who);
} catch (IOException e) {
failure(e);
}
@@ -144,14 +143,13 @@
waitAnswer();
}
- public void unsubscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException
+ public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- os.write(Unsubscribe);
+ out.writeByte(Unsubscribe);
out.writeObject(dest);
- out.writeObject(who);
} catch (IOException e) {
failure(e);
}
@@ -164,7 +162,7 @@
if (socket==null) createConnection();
try {
- os.write(CreateTopic);
+ out.writeByte(CreateTopic);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
@@ -178,7 +176,7 @@
if (socket==null) createConnection();
try {
- os.write(CreateQueue);
+ out.writeByte(CreateQueue);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
@@ -192,8 +190,7 @@
if (socket==null) createConnection();
try {
- os.write(GetTemporaryTopic);
- out.writeObject(dc);
+ out.writeByte(GetTemporaryTopic);
} catch (IOException e) {
failure(e);
}
@@ -206,8 +203,7 @@
if (socket==null) createConnection();
try {
- os.write(GetTemporaryQueue);
- out.writeObject(dc);
+ out.writeByte(GetTemporaryQueue);
} catch (IOException e) {
failure(e);
}
@@ -220,8 +216,7 @@
if (socket==null) createConnection();
try {
- os.write(ConnectionClosing);
- out.writeObject(dc);
+ out.writeByte(ConnectionClosing);
} catch (IOException e) {
failure(e);
}
@@ -234,7 +229,7 @@
if (socket==null) createConnection();
try {
- os.write(DeleteTemporaryDestination);
+ out.writeByte(DeleteTemporaryDestination);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
@@ -248,7 +243,7 @@
if (socket==null) createConnection();
try {
- os.write(CheckID);
+ out.writeByte(CheckID);
out.writeObject(ID);
} catch (IOException e) {
failure(e);
@@ -256,35 +251,79 @@
waitAnswer();
}
+
- public SpyMessage queueReceiveNoWait(Queue queue) throws Exception,
RemoteException
+ public void connectionListening(boolean mode,Destination dest,
SpyDistributedConnection dc) throws Exception, RemoteException
{
if (socket==null) createConnection();
+ try {
+ out.writeByte(ConnectionListening);
+ out.writeBoolean(mode);
+ out.writeObject(dest);
+ } catch (IOException e) {
+ failure(e);
+ }
+
+ waitAnswer();
+ }
+
+ public void acknowledge(SpyAcknowledgementItem item[],boolean
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException {
+ if (socket==null) createConnection();
+
try {
- os.write(QueueReceiveNoWait);
+ out.writeByte(Acknowledge);
+ out.writeInt(item.length);
+ for( int i=0; i< item.length; i++ )
+ out.writeObject(item[i]);
+ out.writeBoolean(isAck);
+ } catch (IOException e) {
+ failure(e);
+ }
+
+ waitAnswer();
+ }
+
+ public SpyMessage queueReceive(Queue queue, long wait) throws Exception,
RemoteException
+ {
+ if (socket==null) createConnection();
+
+ try {
+ out.writeByte(QueueReceive);
out.writeObject(queue);
+ out.writeLong(wait);
} catch (IOException e) {
failure(e);
}
return (SpyMessage)waitAnswer();
- }
+ }
- public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws Exception, RemoteException
+ public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws Exception, RemoteException
{
if (socket==null) createConnection();
try {
- os.write(ConnectionListening);
- if (mode) os.write(1);
- else os.write(0);
- out.writeObject(dest);
- out.writeObject(dc);
+ out.writeByte(QueueReceive);
+ out.writeObject(queue);
+ out.writeLong(wait);
} catch (IOException e) {
failure(e);
}
+ return (SpyMessage)waitAnswer();
+ }
+
+
+ public void setSpyDistributedConnection(SpyDistributedConnection dest) throws
RemoteException {
+ if (socket==null) createConnection();
+
+ try {
+ out.writeByte( SetSpyDistributedConnection );
+ out.writeObject(dest);
+ } catch (IOException e) {
+ failure(e);
+ }
waitAnswer();
}
1.2 +15 -12
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java
Index: DistributedJMSServerRMI.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMI.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- DistributedJMSServerRMI.java 2000/05/31 18:10:16 1.1
+++ DistributedJMSServerRMI.java 2000/11/19 20:00:01 1.2
@@ -12,36 +12,39 @@
import javax.jms.Queue;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
+import java.rmi.Remote;
+import java.rmi.RemoteException;
import org.spydermq.SpyMessage;
import org.spydermq.SpyDestination;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
-import java.rmi.Remote;
-import java.rmi.RemoteException;
+import org.spydermq.SpyAcknowledgementItem;
/**
* The RMI interface of the DistributedJMSServer object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface DistributedJMSServerRMI extends DistributedJMSServer, Remote
{
+
// Public --------------------------------------------------------
-
public String getID() throws JMSException, RemoteException;
- public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException;
- public void subscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException;
- public void unsubscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException;
+ public void newMessage(SpyMessage val[],String id) throws JMSException,
RemoteException;
public Topic createTopic(String dest) throws JMSException, RemoteException;
public Queue createQueue(String dest) throws JMSException, RemoteException;
- public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException;
- public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException;
- public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException;
public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException, RemoteException;
public void checkID(String ID) throws JMSException, RemoteException;
- public SpyMessage queueReceiveNoWait(Queue queue) throws Exception,
RemoteException;
+ public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection
newSpyDistributedConnection) throws RemoteException;
+ public void acknowledge(SpyAcknowledgementItem[] items, boolean
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException;
+ public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException;
public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws Exception, RemoteException;
-
+ public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException;
+ public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException;
+ public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws Exception, RemoteException;
+ public void subscribe(Destination dest, SpyDistributedConnection dc) throws
JMSException, RemoteException;
+ public void unsubscribe(Destination dest, SpyDistributedConnection dc) throws
JMSException, RemoteException;
+
}
1.4 +49 -40
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java
Index: DistributedJMSServerRMIImpl.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerRMIImpl.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- DistributedJMSServerRMIImpl.java 2000/06/19 04:23:15 1.3
+++ DistributedJMSServerRMIImpl.java 2000/11/19 20:00:01 1.4
@@ -12,22 +12,23 @@
import javax.jms.Queue;
import javax.jms.TemporaryTopic;
import javax.jms.TemporaryQueue;
+import java.rmi.server.UnicastRemoteObject;
+import java.rmi.RemoteException;
import org.spydermq.SpyMessage;
import org.spydermq.SpyDestination;
import org.spydermq.JMSServer;
import org.spydermq.SpyDistributedConnection;
import org.spydermq.Log;
-import java.rmi.server.UnicastRemoteObject;
-import java.rmi.RemoteException;
import org.spydermq.distributed.interfaces.DistributedJMSServer;
import org.spydermq.distributed.interfaces.DistributedJMSServerSetup;
+import org.spydermq.SpyAcknowledgementItem;
/**
* The RMI implementation of the DistributedJMSServer object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class DistributedJMSServerRMIImpl
extends UnicastRemoteObject
@@ -46,27 +47,16 @@
}
// Public --------------------------------------------------------
-
public String getID() throws JMSException
{
return server.getID();
}
- public void newMessage(SpyMessage val[],String id) throws JMSException
+ public void newMessage(SpyMessage val[],String id) throws JMSException
{
server.newMessage(val,id);
}
- public void subscribe(Destination dest,SpyDistributedConnection who) throws
JMSException
- {
- server.subscribe(dest,who);
- }
-
- public void unsubscribe(Destination dest,SpyDistributedConnection who) throws
JMSException
- {
- server.unsubscribe(dest,who);
- }
-
public Topic createTopic(String dest) throws JMSException
{
return server.createTopic(dest);
@@ -76,21 +66,6 @@
{
return server.createQueue(dest);
}
-
- public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException
- {
- return server.getTemporaryTopic(dc);
- }
-
- public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException
- {
- return server.getTemporaryQueue(dc);
- }
-
- public void connectionClosing(SpyDistributedConnection dc) throws JMSException
- {
- server.connectionClosing(dc,null);
- }
public void deleteTemporaryDestination(SpyDestination dest) throws JMSException
{
@@ -101,16 +76,6 @@
{
server.checkID(ID);
}
-
- public SpyMessage queueReceiveNoWait(Queue queue) throws JMSException
- {
- return server.queueReceiveNoWait(queue);
- }
-
- public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws JMSException
- {
- server.connectionListening(mode,dest,dc);
- }
// --
@@ -122,6 +87,50 @@
public void setServer(JMSServer s)
{
server=s;
+ }
+
+ public void setSpyDistributedConnection(org.spydermq.SpyDistributedConnection
newSpyDistributedConnection) {
+ // We cannot try to cache the dc since different dc's are going
+ // to access the same remote object via RMI
+ }
+
+ public void acknowledge(SpyAcknowledgementItem[] items, boolean
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException{
+ server.acknowledge(items, isAck, dc);
+ }
+
+ public void connectionClosing(SpyDistributedConnection dc) throws JMSException
+ {
+ server.connectionClosing(dc,null);
+ }
+
+ public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws JMSException
+ {
+ server.connectionListening(mode,dest,dc);
+ }
+
+ public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException
+ {
+ return server.getTemporaryQueue(dc);
+ }
+
+ public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException
+ {
+ return server.getTemporaryTopic(dc);
+ }
+
+ public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws JMSException
+ {
+ return server.queueReceive(queue, wait, dc);
+ }
+
+ public void subscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException
+ {
+ server.subscribe(dest,dc);
+ }
+
+ public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException
+ {
+ server.unsubscribe(dest,dc);
}
}
1.2 +81 -127
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java
Index: ConnectionReceiverUIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUIL.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ConnectionReceiverUIL.java 2000/11/16 22:46:18 1.1
+++ ConnectionReceiverUIL.java 2000/11/19 20:00:01 1.2
@@ -9,10 +9,10 @@
import javax.jms.Destination;
import javax.jms.JMSException;
import org.spydermq.SpyConnection;
-import org.spydermq.ConnectionQueue;
+
import org.spydermq.SpyMessage;
-import org.spydermq.SpySession;
-import org.spydermq.SessionQueue;
+
+
import org.spydermq.SpyDestination;
import org.spydermq.SpyTopicConnection;
import org.spydermq.SpyQueueSession;
@@ -35,13 +35,13 @@
import java.io.ObjectOutputStream;
import java.io.IOException;
-/**
+import org.spydermq.SpyMessageConsumer;/**
* The OIL implementation of the ConnectionReceiver object
*
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class ConnectionReceiverUIL
implements Runnable, ConnectionReceiverSetup
@@ -71,7 +71,9 @@
void exportObject()
{
- new Thread(this).start();
+ Thread thread = new Thread(this, "ConnectionReceiverUIL");
+ thread.setDaemon(true);
+ thread.start();
}
public void run()
@@ -94,14 +96,17 @@
return;
}
- while (true) {
+ while (!closed) {
try {
- code=in.read();
+ code=in.readByte();
} catch (IOException e) {
+ if( closed )
+ break;
+
failure("Command read",e);
e.printStackTrace();
- return;
+ break;
}
try {
@@ -130,32 +135,48 @@
//Everthing was OK
try {
- out.write(0);
+ out.writeByte(0);
out.flush();
} catch (IOException e) {
+ if( closed )
+ break;;
+
failure("Result write",e);
- return;
+ break;
}
- } catch (Exception e) {
+ } catch (Exception e) {
+
+ if( closed )
+ break;
try {
if( e instanceof NoReceiverException ) {
- out.write(2);
+ out.writeByte(2);
out.writeObject(e.getMessage());
} else {
- out.write(1);
+ out.writeByte(1);
out.writeObject(e);
}
out.flush();
} catch (IOException e2) {
failure("Result write",e2);
- return;
+ break;
}
}
}
+
+ try {
+ Log.log("Closing receiver connections.");
+ out.close();
+ in.close();
+ } catch ( IOException e ) {
+ Log.log("Error whle closing receiver connections ");
+ Log.log(e);
+ return;
+ }
}
void failure(String st,Exception e)
@@ -179,143 +200,76 @@
return new ConnectionReceiverUILClient();
}
- // ---
-
- //<DEBUG>
-
- /*public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
- {
- connection.rec++;
- }
-
- public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
JMSException
- {
- connection.rec++;
- }*/
-
- //</DEBUG>
-
-
//A message has arrived for this Connection, We have to dispatch it to the
sessions
- public void receive(SpyDestination dest,SpyMessage mes) throws JMSException
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
-
- Log.log("ConnectionReceiver:
Receive(Destination="+dest.toString()+",Mes="+mes.toString()+")");
-
+ public void receive(SpyDestination dest, SpyMessage mes) throws JMSException {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+
+ Log.log("ConnectionReceiver: Receive(Destination=" + dest.toString() +
",Mes=" + mes.toString() + ")");
+
if (connection instanceof SpyTopicConnection) {
-
+
//Get the set of subscribers for this Topic
-
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) return;
-
- Iterator i=connectionQueue.subscribers.iterator();
-
- while (i.hasNext()) {
-
- SpySession session=(SpySession)i.next();
-
- //add the new message to the session's queue
- session.dispatchMessage(dest,mes);
-
+ SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+
+ for (int i = 0; i < consumers.length; i++) {
+
+ //add the new message to the consumer's queue
+ consumers[i].addMessage(mes);
+
//There is work to do...
- session.mutex.notifyLock();
+ consumers[i].session.mutex.notifyLock();
}
+
} else {
-
- while (true) {
-
- SessionQueue sq=null;
- try {
-
- //Find one session waiting for this Queue
- if (connection.modeStop) throw new
Exception("This connection is stopped !");
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) throw new
Exception("There is no connectionQueue for this destination !");
-
- synchronized (connectionQueue) {
-
- //Find a SessionQueue
- if
(connectionQueue.NumListeningSessions==0) throw new NoReceiverException("There are no
listening sessions for this destination !");
-
- Iterator
i=connectionQueue.subscribers.iterator();
- while (i.hasNext()) {
- SpySession
session=(SpySession)i.next();
-
sq=(SessionQueue)session.destinations.get(dest);
- if
(sq.NumListeningSubscribers!=0) break;
- }
- if
(sq==null||sq.NumListeningSubscribers==0) {
- Log.error("FIXME: The
listeners count was invalid !");
- throw new
NoReceiverException("There are no listening sessions for this destination !");
- }
-
- //Try with this sessionQueue
- Log.log("Dispatching to SessionQueue:
"+mes);
- sq.dispatchMessage(dest,mes);
-
- //Our work is done here
- break;
- }
+ //Find one session waiting for this Queue
+ if (connection.modeStop)
+ throw new JMSException("This connection is stopped !");
+
+ SpyMessageConsumer consumer =
connection.pickListeningConsumer(dest);
+ if (consumer == null)
+ throw new NoReceiverException("There are no listening
sessions for this destination !");
+
+ //Try with this sessionQueue
+ Log.log("Dispatching to SessionQueue: " + mes);
+ ((org.spydermq.SpyQueueReceiver)
consumer).dispatchMessage(mes);
- } catch (NoReceiverException e) {
- //This SessionQueue should not have been
registered !
- throw e;
- } catch (Exception e) {
- //This error is non-recoverable : we must
unregister from this queue
- //Let the JMSServerQueue do its work
- Log.log(e);
- throw new JMSException("There are no listening
sessions in this connection");
- }
- }
-
}
-
}
- public void receiveMultiple(SpyDestination dest,int nb,ObjectInputStream in)
throws Exception
- {
- if (closed) throw new IllegalStateException("The connection is
closed");
-
+ public void receiveMultiple(SpyDestination dest, int nb, ObjectInputStream in)
throws Exception {
+ if (closed)
+ throw new IllegalStateException("The connection is closed");
+
Log.log("ConnectionReceiver: ReceiveMultiple()");
-
+
if (connection instanceof SpyTopicConnection) {
-
+
//Get the set of subscribers for this Topic
-
- ConnectionQueue
connectionQueue=(ConnectionQueue)connection.destinations.get(dest);
- if (connectionQueue==null) return;
-
- for(int val=0;val<nb;val++) {
-
- SpyMessage mes=(SpyMessage)in.readObject();
-
- //NL: i is a short-lived object. Try to "group"
messages in an pre-allocated/peristant
- //array and apply the same iterator on this array
- Iterator i=connectionQueue.subscribers.iterator();
-
- while (i.hasNext()) {
-
- SpySession session=(SpySession)i.next();
-
- //add the new message to the session's queue
- session.dispatchMessage(dest,mes);
-
+ SpyMessageConsumer consumers[] = connection.getConsumers(dest);
+
+ for (int val = 0; val < nb; val++) {
+ SpyMessage mes = (SpyMessage) in.readObject();
+
+ for (int i = 0; i < consumers.length; i++) {
+
+ //add the new message to the consumer's queue
+ consumers[i].addMessage(mes);
+
//There is work to do...
- session.mutex.notifyLock();
+ consumers[i].session.mutex.notifyLock();
}
}
-
} else {
- throw new Exception("Multiple dispatch for a Queue");
+ throw new Exception("Multiple dispatch for a Queue");
}
}
public void close() throws Exception
{
- closed=true;
+ closed=true;
}
//One TemporaryDestination has been deleted
1.2 +11 -15
spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java
Index: ConnectionReceiverUILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/ConnectionReceiverUILClient.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- ConnectionReceiverUILClient.java 2000/11/16 22:46:19 1.1
+++ ConnectionReceiverUILClient.java 2000/11/19 20:00:02 1.2
@@ -30,7 +30,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
import org.spydermq.multiplexor.SocketMultiplexor;public class
ConnectionReceiverUILClient
implements ConnectionReceiver, Serializable
@@ -40,17 +40,11 @@
static final int DELETE_TEMPORARY_DESTINATION = 3;
static final int CLOSE = 4;
-
-
-
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
-
-
-
+ transient SocketMultiplexor mSocket;
-
-
+
void createConnection() throws RemoteException
{
try {
@@ -68,7 +62,7 @@
Exception throwException=null;
try {
out.flush();
- int val=in.read();
+ int val=in.readByte();
switch(val) {
case 1:
Exception e=(Exception)in.readObject();
@@ -91,7 +85,7 @@
public void receive(SpyDestination dest,SpyMessage mes) throws Exception
{
if (out==null) createConnection();
- out.write(RECEIVE);
+ out.writeByte(RECEIVE);
out.writeObject(dest);
out.writeObject(mes);
waitAnswer();
@@ -100,7 +94,7 @@
public void receiveMultiple(SpyDestination dest,SpyMessage mes[]) throws
Exception
{
if (out==null) createConnection();
- out.write(RECEIVE_MULTIPLE);
+ out.writeByte(RECEIVE_MULTIPLE);
out.writeObject(dest);
out.writeInt(mes.length);
for(int i=0;i<mes.length;i++)
@@ -111,7 +105,7 @@
public void deleteTemporaryDestination(SpyDestination dest) throws Exception
{
if (out==null) createConnection();
- out.write(DELETE_TEMPORARY_DESTINATION);
+ out.writeByte(DELETE_TEMPORARY_DESTINATION);
out.writeObject(dest);
waitAnswer();
}
@@ -119,10 +113,12 @@
public void close() throws Exception
{
if (out==null) createConnection();
- out.write(CLOSE);
+ out.writeByte(CLOSE);
waitAnswer();
}
- transient SocketMultiplexor mSocket; public ConnectionReceiverUILClient()
+ public ConnectionReceiverUILClient()
{
- }}
+ }
+
+}
1.2 +58 -41
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java
Index: DistributedJMSServerUIL.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUIL.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- DistributedJMSServerUIL.java 2000/11/16 22:46:19 1.1
+++ DistributedJMSServerUIL.java 2000/11/19 20:00:02 1.2
@@ -36,9 +36,9 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
-import org.spydermq.multiplexor.SocketMultiplexor;public class
DistributedJMSServerUIL
+import org.spydermq.multiplexor.SocketMultiplexor;import
org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerUIL
implements Runnable, DistributedJMSServerSetup, DistributedJMSServerUILMBean
{
@@ -55,7 +55,7 @@
}
// Internals -----------------------------------------------------
-
+
static final int GetID = 1;
static final int NewMessage = 2;
static final int Subscribe = 3;
@@ -67,8 +67,10 @@
static final int ConnectionClosing = 9;
static final int DeleteTemporaryDestination = 10;
static final int CheckID = 11;
- static final int QueueReceiveNoWait = 12;
+ static final int QueueReceive = 12;
static final int ConnectionListening = 13;
+ static final int Acknowledge = 14;
+ static final int SetSpyDistributedConnection = 15;
private ServerSocket serverSocket;
@@ -89,6 +91,8 @@
int code = 0;
ObjectOutputStream out=null;
ObjectInputStream in=null;
+ SpyDistributedConnection spyDistributedConnection=null;
+ boolean closed = false;
try {
socket = serverSocket.accept();
@@ -111,14 +115,14 @@
}
- while (true) {
+ while (!closed) {
try {
- code=in.read();
+ code=in.readByte();
} catch (IOException e) {
Log.notice("Command read");
Log.notice(e);
- return;
+ break;
}
try {
@@ -134,14 +138,10 @@
newMessage((String)in.readObject(),in.readInt(),in);
break;
case Subscribe:
- Destination d =
(Destination)in.readObject();
- SpyDistributedConnection c =
(SpyDistributedConnection)in.readObject();
- if( c.cr instanceof
ConnectionReceiverUILClient )
-
((ConnectionReceiverUILClient)c.cr).mSocket = mSocket;
- server.subscribe(d,c);
+
server.subscribe((Destination)in.readObject(),spyDistributedConnection);
break;
case Unsubscribe:
-
server.unsubscribe((Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
+
server.unsubscribe((Destination)in.readObject(),spyDistributedConnection);
break;
case CreateTopic:
result=(Topic)server.createTopic((String)in.readObject());
@@ -150,13 +150,14 @@
result=(Queue)server.createQueue((String)in.readObject());
break;
case GetTemporaryTopic:
-
result=(TemporaryTopic)server.getTemporaryTopic((SpyDistributedConnection)in.readObject());
+
result=(TemporaryTopic)server.getTemporaryTopic(spyDistributedConnection);
break;
case GetTemporaryQueue:
-
result=(TemporaryQueue)server.getTemporaryQueue((SpyDistributedConnection)in.readObject());
+
result=(TemporaryQueue)server.getTemporaryQueue(spyDistributedConnection);
break;
case ConnectionClosing:
-
server.connectionClosing((SpyDistributedConnection)in.readObject(),null);
+
server.connectionClosing(spyDistributedConnection,null);
+ closed = true;
break;
case DeleteTemporaryDestination:
server.deleteTemporaryDestination((SpyDestination)in.readObject());
@@ -164,12 +165,25 @@
case CheckID:
server.checkID((String)in.readObject());
break;
- case QueueReceiveNoWait:
-
result=server.queueReceiveNoWait((Queue)in.readObject());
+ case QueueReceive:
+
result=server.queueReceive((Queue)in.readObject(),
in.readLong(),spyDistributedConnection);
break;
case ConnectionListening:
-
server.connectionListening(((in.read())==1),(Destination)in.readObject(),(SpyDistributedConnection)in.readObject());
+
server.connectionListening(in.readBoolean(),(Destination)in.readObject(),spyDistributedConnection);
break;
+ case Acknowledge:
+ SpyAcknowledgementItem items[] = new
SpyAcknowledgementItem[in.readInt()];
+ for( int i=0; i < items.length; i++ )
+ items[i] =
(SpyAcknowledgementItem)in.readObject();
+ server.acknowledge(items,
in.readBoolean(),spyDistributedConnection);
+ break;
+ case SetSpyDistributedConnection:
+ spyDistributedConnection =
(SpyDistributedConnection)in.readObject();
+ if( spyDistributedConnection.cr
instanceof ConnectionReceiverUILClient ) {
+
((ConnectionReceiverUILClient)spyDistributedConnection.cr).mSocket = mSocket;
+
((ConnectionReceiverUILClient)spyDistributedConnection.cr).createConnection();
+ }
+ break;
default:
throw new RemoteException("Bad method
code !");
}
@@ -177,29 +191,45 @@
//Everthing was OK
try {
- if (result==null) out.write(0);
+ if (result==null)
+ out.writeByte(0);
else {
- out.write(1);
+ out.writeByte(1);
out.writeObject(result);
}
out.flush();
} catch (IOException e) {
+ if( closed )
+ break;
failure("Result write",e);
- return;
+ break;
}
} catch (Exception e) {
+ if( closed )
+ break;
try {
- out.write(2);
+ out.writeByte(2);
out.writeObject(e);
out.flush();
} catch (IOException e2) {
failure("Result write",e2);
- return;
+ break;
}
}
}
+
+ try {
+
+ if( !closed )
+
server.connectionClosing(spyDistributedConnection,null);
+
+ mSocket.close();
+ } catch ( IOException e ) {
+ Log.log("Could not gracefully close the connection to the
server.");
+ Log.log(e);
+ }
}
void failure(String st,Exception e)
@@ -210,27 +240,14 @@
void newMessage(String id,int nb,ObjectInputStream in) throws Exception
{
- Log.notice("INCOMING: "+nb+" messages from "+id);
-
- SpyDestination dest=null;
- JMSServerQueue queue=null;
-
- for(int i=0;i<nb;i++) {
-
- SpyMessage mes=(SpyMessage)in.readObject();
-
- if (dest==null||!dest.equals(mes.jmsDestination)) {
-
queue=(JMSServerQueue)server.messageQueue.get(mes.jmsDestination);
- if (queue==null) throw new JMSException("This
destination does not exist !"); //hum...
- }
+ SpyMessage mes[] = new SpyMessage[nb];
+ for(int i=0;i<nb;i++)
+ mes[i]=(SpyMessage)in.readObject();
- //Add the message to the queue
- queue.addMessage(mes);
- }
+ server.newMessage(mes, id);
}
-
- // --
+ // --
public DistributedJMSServer createClient() throws Exception
{
return new
DistributedJMSServerUILClient(InetAddress.getLocalHost(),serverSocket.getLocalPort());
1.2 +93 -58
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java
Index: DistributedJMSServerUILClient.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILClient.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- DistributedJMSServerUILClient.java 2000/11/16 22:46:20 1.1
+++ DistributedJMSServerUILClient.java 2000/11/19 20:00:02 1.2
@@ -34,9 +34,9 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
-import org.spydermq.multiplexor.SocketMultiplexor;public class
DistributedJMSServerUILClient
+import org.spydermq.multiplexor.SocketMultiplexor;import
org.spydermq.SpyAcknowledgementItem;public class DistributedJMSServerUILClient
implements DistributedJMSServer, Serializable
{
@@ -51,19 +51,22 @@
static final int ConnectionClouting = 9;
static final int DeleteTemporaryDestination = 10;
static final int CheckID = 11;
- static final int QueueReceiveNoWait = 12;
+ static final int QueueReceive = 12;
static final int ConnectionListening = 13;
+ static final int Acknowledge = 14;
+ static final int SetSpyDistributedConnection = 15;
+
//Remote stuff
+ private int port;
+ private InetAddress addr;
private transient Socket socket;
+ transient SocketMultiplexor mSocket;
-
private transient ObjectOutputStream out;
private transient ObjectInputStream in;
- private int port;
- private InetAddress addr;
public DistributedJMSServerUILClient(InetAddress addr,int port)
{
@@ -95,7 +98,7 @@
{
try {
out.flush();
- int val=in.read();
+ int val=in.readByte();
if (val==0) return null;
if (val==1) {
return in.readObject();
@@ -126,7 +129,7 @@
if (socket==null) createConnection();
try {
- out.write(NewMessage);
+ out.writeByte(NewMessage);
out.writeObject(id);
out.writeInt(val.length);
for(int i=0;i<val.length;i++)
@@ -143,7 +146,7 @@
if (socket==null) createConnection();
try {
- out.write(GetID);
+ out.writeByte(GetID);
} catch (IOException e) {
failure(e);
}
@@ -151,162 +154,194 @@
return (String)waitAnswer();
}
- public void subscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException
+
+
+
+
+ public Topic createTopic(String dest) throws JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(Subscribe);
+ out.writeByte(CreateTopic);
out.writeObject(dest);
- out.writeObject(who);
} catch (IOException e) {
failure(e);
}
- waitAnswer();
+ return (Topic)waitAnswer();
}
- public void unsubscribe(Destination dest,SpyDistributedConnection who) throws
JMSException, RemoteException
+ public Queue createQueue(String dest) throws JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(Unsubscribe);
+ out.writeByte(CreateQueue);
out.writeObject(dest);
- out.writeObject(who);
} catch (IOException e) {
failure(e);
}
- waitAnswer();
+ return (Queue)waitAnswer();
}
- public Topic createTopic(String dest) throws JMSException, RemoteException
+
+
+
+
+
+
+ public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(CreateTopic);
+ out.writeByte(DeleteTemporaryDestination);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
- return (Topic)waitAnswer();
+ waitAnswer();
}
- public Queue createQueue(String dest) throws JMSException, RemoteException
+ public void checkID(String ID) throws JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(CreateQueue);
- out.writeObject(dest);
+ out.writeByte(CheckID);
+ out.writeObject(ID);
} catch (IOException e) {
failure(e);
}
- return (Queue)waitAnswer();
+ waitAnswer();
}
- public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException
- {
+ public void setSpyDistributedConnection(SpyDistributedConnection dest) throws
RemoteException {
if (socket==null) createConnection();
try {
- out.write(GetTemporaryTopic);
- out.writeObject(dc);
+ out.writeByte( SetSpyDistributedConnection );
+ out.writeObject(dest);
} catch (IOException e) {
failure(e);
+ }
+ waitAnswer();
+ }
+
+ public void acknowledge(SpyAcknowledgementItem item[],boolean
isAck,SpyDistributedConnection dc) throws JMSException, RemoteException {
+ if (socket==null) createConnection();
+
+ try {
+ out.writeByte(Acknowledge);
+ out.writeInt(item.length);
+ for( int i=0; i< item.length; i++ )
+ out.writeObject(item[i]);
+ out.writeBoolean(isAck);
+ } catch (IOException e) {
+ failure(e);
}
- return (TemporaryTopic)waitAnswer();
- }
+ waitAnswer();
+ }
- public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException
+ public void connectionClosing(SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(GetTemporaryQueue);
- out.writeObject(dc);
+ out.writeByte(ConnectionClouting);
} catch (IOException e) {
failure(e);
}
-
- return (TemporaryQueue)waitAnswer();
- }
-
-
+ waitAnswer();
+ }
- public void deleteTemporaryDestination(SpyDestination dest) throws
JMSException, RemoteException
+ public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws Exception, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(DeleteTemporaryDestination);
+ out.writeByte(ConnectionListening);
+ out.writeBoolean(mode);
out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
waitAnswer();
- }
+ }
- public void checkID(String ID) throws JMSException, RemoteException
+ public TemporaryQueue getTemporaryQueue(SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(CheckID);
- out.writeObject(ID);
+ out.writeByte(GetTemporaryQueue);
} catch (IOException e) {
failure(e);
}
- waitAnswer();
- }
+ return (TemporaryQueue)waitAnswer();
+ }
- public SpyMessage queueReceiveNoWait(Queue queue) throws Exception,
RemoteException
+ public TemporaryTopic getTemporaryTopic(SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(QueueReceiveNoWait);
+ out.writeByte(GetTemporaryTopic);
+ } catch (IOException e) {
+ failure(e);
+ }
+
+ return (TemporaryTopic)waitAnswer();
+ }
+
+ public SpyMessage queueReceive(Queue queue, long wait,SpyDistributedConnection
dc) throws Exception, RemoteException
+ {
+ if (socket==null) createConnection();
+
+ try {
+ out.writeByte(QueueReceive);
out.writeObject(queue);
+ out.writeLong(wait);
} catch (IOException e) {
failure(e);
}
return (SpyMessage)waitAnswer();
- }
+ }
- public void connectionListening(boolean mode,Destination
dest,SpyDistributedConnection dc) throws Exception, RemoteException
+ public void subscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(ConnectionListening);
- if (mode) out.write(1);
- else out.write(0);
+ out.writeByte(Subscribe);
out.writeObject(dest);
- out.writeObject(dc);
} catch (IOException e) {
failure(e);
}
waitAnswer();
- }
+ }
- static final int ConnectionOneTimeListener = 14; transient
SocketMultiplexor mSocket; public void connectionClosing(SpyDistributedConnection
dc) throws JMSException, RemoteException
+ public void unsubscribe(Destination dest,SpyDistributedConnection dc) throws
JMSException, RemoteException
{
if (socket==null) createConnection();
try {
- out.write(ConnectionClouting);
- out.writeObject(dc);
+ out.writeByte(Unsubscribe);
+ out.writeObject(dest);
} catch (IOException e) {
failure(e);
}
+
waitAnswer();
- }}
+ }
+
+}
1.2 +0 -0
spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILMBean.java
Index: DistributedJMSServerUILMBean.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/distributed/server/DistributedJMSServerUILMBean.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- DistributedJMSServerUILMBean.java 2000/11/16 22:46:20 1.1
+++ DistributedJMSServerUILMBean.java 2000/11/19 20:00:02 1.2
@@ -14,7 +14,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public interface DistributedJMSServerUILMBean
{