User: norbert
Date: 00/06/04 20:19:24
Modified: src/java/org/spydermq ConnectionQueue.java
JMSServerQueue.java SessionQueue.java
SpyConnection.java SpyQueueReceiver.java
SpyQueueSession.java
Log:
Add synchronization
Local optimization
Revision Changes Path
1.3 +20 -16 spyderMQ/src/java/org/spydermq/ConnectionQueue.java
Index: ConnectionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/ConnectionQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- ConnectionQueue.java 2000/05/31 23:22:55 1.2
+++ ConnectionQueue.java 2000/06/05 03:19:23 1.3
@@ -16,7 +16,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class ConnectionQueue
{
@@ -58,26 +58,30 @@
return subscribers.size()==0;
}
- synchronized void changeNumListening(int val) throws JMSException
+ void changeNumListening(int val) throws JMSException
{
- NumListeningSessions+=val;
+ //Before check
+
+ //Synchronized part
+ synchronized (this) {
+ NumListeningSessions+=val;
- Log.log("ConnectionQueue: changeNumListening(listening
sessions="+NumListeningSessions+")");
+ Log.log("ConnectionQueue: changeNumListening(listening
sessions="+NumListeningSessions+")");
- if (connection.modeStop) return;
+ if (connection.modeStop) return;
- try {
-
- if (val==-1&&NumListeningSessions==0) {
-
connection.provider.connectionListening(false,destination,connection.distributedConnection);
- } else if (val==1&&NumListeningSessions==1) {
-
connection.provider.connectionListening(true,destination,connection.distributedConnection);
+ try {
+
+ if (val==-1&&NumListeningSessions==0) {
+
connection.provider.connectionListening(false,destination,connection.distributedConnection);
+ } else if (val==1&&NumListeningSessions==1) {
+
connection.provider.connectionListening(true,destination,connection.distributedConnection);
+ }
+
+ } catch (Exception e) {
+ connection.failureHandler(e,"Cannot contact the JMS
server");
}
-
- } catch (Exception e) {
- connection.failureHandler(e,"Cannot contact the JMS server");
- }
-
+ }
}
synchronized void start() throws JMSException
1.8 +86 -74 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- JMSServerQueue.java 2000/06/01 02:47:48 1.7
+++ JMSServerQueue.java 2000/06/05 03:19:23 1.8
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class JMSServerQueue
{
@@ -265,67 +265,72 @@
} else {
- while (true) {
+ synchronized (this) {
+ //In the Queue case, we synchronize on [this] to avoid
changes (listening modifications)
+ //while we are dispatching messages
- //At first, find a receiver
- //NL: We could find a better receiver (load balancing
?)
-
- Log.log("get a receiver");
-
- if (listeners==0) break;
- Iterator i=subscribers.values().iterator();
- SpyDistributedConnection dc=null;
- while (i.hasNext()) {
- dc=(SpyDistributedConnection)i.next();
- if (dc.listeners) break;
- }
-
- if (dc==null||!dc.listeners) {
- Log.error("FIXME: The listeners count was
invalid !");
- break;
- }
-
- //Get the message ( if there is one message pending )
- SpyMessage mes=startWorkQueue();
- if (mes==null) break;
- if (mes.isOutdated()) continue;
-
- //Send the message
- try {
- dc.cr.receive(destination,mes);
- } catch (NoReceiverException e) {
+ while (true) {
+
+ //At first, find a receiver
+ //NL: We could find a better receiver (load
balancing ?)
+
+ Log.log("get a receiver");
+
+ if (listeners==0) break;
+ Iterator i=subscribers.values().iterator();
+ SpyDistributedConnection dc=null;
+ while (i.hasNext()) {
+ dc=(SpyDistributedConnection)i.next();
+ if (dc.listeners) break;
+ }
- Log.log(e);
+ if (dc==null||!dc.listeners) {
+ Log.error("FIXME: The listeners count
was invalid !");
+ break;
+ }
+ //Get the message ( if there is one message
pending )
+ SpyMessage mes=startWorkQueue();
+ if (mes==null) break;
+ if (mes.isOutdated()) continue;
+
+ //Send the message
try {
- addMessage(mes);
- connectionListening(false,dc);
- } catch (Exception e2) {
- Log.error(e2);
- }
+ dc.cr.receive(destination,mes);
+ } catch (NoReceiverException e) {
+
+ Log.log(e);
- } catch (JMSException e) {
- throw e;
- } catch (Exception e) {
- //This is a transport failure. We should
define our own Transport Failure class
- //to catch errors more precisely.
-
- try {
- addMessage(mes);
- } catch (Exception e2) {
- Log.error(e2);
- }
-
- Log.error("Cannot deliver this message to the
client "+dc);
- Log.error(e);
- handleConnectionFailure(dc,null);
- }
-
- }
-
- //Notify that it has finished its work : another thread can
start working on this queue
- endWork();
+ try {
+ addMessage(mes);
+ connectionListening(false,dc);
+ } catch (Exception e2) {
+ Log.error(e2);
+ }
+
+ } catch (JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ //This is a transport failure. We
should define our own Transport Failure class
+ //for a better execption catching
+
+ try {
+ addMessage(mes);
+ } catch (Exception e2) {
+ Log.error(e2);
+ }
+ Log.error("Cannot deliver this message
to the client "+dc);
+ Log.error(e);
+ handleConnectionFailure(dc,null);
+ }
+
+ }
+
+ //Notify that it has finished its work : another
thread can start working on this queue
+ endWork();
+
+ }
}
}
@@ -339,30 +344,37 @@
void connectionListening(boolean mode,SpyDistributedConnection dc) throws
JMSException
{
- SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
- if (distributedConnection==null) throw new JMSException("This
DistributedConnection is not registered");
+
+ // Before check
- if (mode) {
- if (!distributedConnection.listeners) {
- distributedConnection.listeners=true;
- listeners++;
- }
- } else {
- if (distributedConnection.listeners) {
- distributedConnection.listeners=false;
- listeners--;
+ // Synchronized code : We want to avoid sending messages while we are
changing the connection status
+ synchronized (this) {
+
+
+ SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
+ if (distributedConnection==null) throw new JMSException("This
DistributedConnection is not registered");
+
+ if (mode) {
+ if (!distributedConnection.listeners) {
+ distributedConnection.listeners=true;
+ listeners++;
+ }
+ } else {
+ if (distributedConnection.listeners) {
+ distributedConnection.listeners=false;
+ listeners--;
+ }
}
- }
- if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
- synchronized (messages) {
- if (!messages.isEmpty()) notifyWorkers();
+ if (listeners!=0&&!threadWorking&&!alreadyInTaskQueue) {
+ synchronized (messages) {
+ if (!messages.isEmpty()) notifyWorkers();
+ }
}
- }
-
- Log.log("Listeners for "+destination+" = "+listeners);
+ Log.log("Listeners for "+destination+" = "+listeners);
+ }
}
}
1.4 +6 -5 spyderMQ/src/java/org/spydermq/SessionQueue.java
Index: SessionQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SessionQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SessionQueue.java 2000/06/01 20:11:48 1.3
+++ SessionQueue.java 2000/06/05 03:19:23 1.4
@@ -21,7 +21,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SessionQueue
{
@@ -180,15 +180,15 @@
public void dispatchMessage(Destination dest, SpyMessage mes) throws
JMSException
{
Log.log("SessionQueue:
dispatchMessage(Destination="+dest.toString()+",Mes="+mes.toString()+")");
-
+
while (true) {
-
+
+ //We should synchronize there !!
if (session.closed) throw new NoReceiverException("The session
is closed");
if (session.modeStop) throw new NoReceiverException("The
session is stopped");
if (NumListeningSubscribers==0) throw new
NoReceiverException("There are no receivers for this destination !");
if (mes.isOutdated()) return;
- //We should use a linked list of all Receivers (enable
synchronization)
Iterator i=subscribers.iterator();
SpyQueueReceiver receiver=null;
while (i.hasNext()) {
@@ -200,8 +200,8 @@
throw new NoReceiverException("The listeners count was
invalid !");
}
+ //Work with this receiver
synchronized (receiver.messages) {
-
if (receiver.messageListener==null) {
if (!receiver.waitInReceive) {
Log.notice("The receiver was not
waiting for a message !");
@@ -216,6 +216,7 @@
}
}
+ //Our work is done here
break;
}
1.4 +26 -8 spyderMQ/src/java/org/spydermq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyConnection.java 2000/06/01 00:04:40 1.3
+++ SpyConnection.java 2000/06/05 03:19:23 1.4
@@ -29,7 +29,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyConnection
implements Connection, Serializable
@@ -235,14 +235,32 @@
if (distributedConnection==null) createReceiver();
try {
-
- provider.newMessage(mes,clientID);
-
- /*if (mes.jmsDestination instanceof Topic) {
- //If this message is sent to a topic, we can try to
deliver it locally
-
distributedConnection.cr.receive(mes.jmsDestination,mes);
- }*/
+
+ for(int i=0;i<mes.length;i++) {
+
+ SpyMessage message=mes[i];
+
+ if (message.isOutdated()) continue;
+ Class messageClass=message.jmsDestination.getClass();
+
+ if (messageClass==SpyTemporaryTopic.class) {
+ if
(((SpyTemporaryTopic)message.jmsDestination).dc.equals(distributedConnection)) {
+ Log.log("local");
+ }
+ } else if (messageClass==SpyTemporaryQueue.class) {
+ if
(((SpyTemporaryQueue)message.jmsDestination).dc.equals(distributedConnection)) {
+ Log.log("local");
+ }
+ } else if (messageClass==SpyTopic.class) {
+ //Alpha mode test
+ } else {
+ //Alpha mode test + local delivery
+ }
+
+ }
+ provider.newMessage(mes,clientID);
+
} catch (Exception e) {
failureHandler(e,"Cannot send a message to the JMS provider");
}
1.3 +6 -4 spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java
Index: SpyQueueReceiver.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueReceiver.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyQueueReceiver.java 2000/06/01 01:14:29 1.2
+++ SpyQueueReceiver.java 2000/06/05 03:19:23 1.3
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyQueueReceiver
extends SpyMessageConsumer
@@ -51,10 +51,12 @@
public void close() throws JMSException
{
- if (closed) return;
- closed=true;
+ synchronized (messages) {
+ if (closed) return;
+ closed=true;
- setListening(false);
+ setListening(false);
+ }
}
//Overrides MessageConsumer
1.2 +2 -2 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyQueueSession.java 2000/05/31 18:06:45 1.1
+++ SpyQueueSession.java 2000/06/05 03:19:23 1.2
@@ -23,7 +23,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyQueueSession
extends SpySession
@@ -138,7 +138,7 @@
}
//Notify the [sleeping ?] thread that there is work to do
- //We should not wait for the lock...
+ //We should not have to wait for the lock...
synchronized (thread)
{
thread.notify();