User: norbert
Date: 00/05/31 17:04:41
Modified: src/java/org/spydermq JMSServer.java JMSServerQueue.java
SpyConnection.java SpyDistributedConnection.java
Log:
P2P system ( error fixes )
Revision Changes Path
1.2 +2 -2 spyderMQ/src/java/org/spydermq/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JMSServer.java 2000/05/31 18:06:40 1.1
+++ JMSServer.java 2000/06/01 00:04:40 1.2
@@ -22,7 +22,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class JMSServer
implements Runnable
@@ -207,7 +207,7 @@
JMSServerQueue queue=(JMSServerQueue)messageQueue.get(dest);
if (queue==null) throw new JMSException("This destination does not
exist !");
- queue.removeSubscriber(dc);
+ queue.removeSubscriber(dc,null);
}
public synchronized SpyTopic createTopic(String name) throws JMSException
1.3 +21 -15 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- JMSServerQueue.java 2000/05/31 23:22:55 1.2
+++ JMSServerQueue.java 2000/06/01 00:04:40 1.3
@@ -18,7 +18,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class JMSServerQueue
{
@@ -73,16 +73,17 @@
}
}
- void removeSubscriber(SpyDistributedConnection dc)
+ void removeSubscriber(SpyDistributedConnection dc,Iterator i)
{
//We want to avoid removeSubscriber, addSubscriber or sendOneMessage
to work concurently
synchronized (destination) {
SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
if (distributedConnection==null) return;
- listeners-=distributedConnection.listeners;
+ if (distributedConnection.listeners) listeners--;
- subscribers.remove(dc);
+ if (i==null) subscribers.remove(dc);
+ else i.remove();
}
}
@@ -208,7 +209,7 @@
{
if (!subscribers.containsKey(dc.getClientID())) return;
Log.notice("Warning: The DistributedConnection was still registered
for "+destination);
- removeSubscriber(dc);
+ removeSubscriber(dc,null);
}
void notifyWorkers()
@@ -232,8 +233,7 @@
server.connectionClosing(dc,this);
//remove this connection from the list
- if (i!=null) i.remove();
- else subscribers.remove(dc.getClientID());
+ removeSubscriber(dc,i);
}
void doMyJob()
@@ -267,14 +267,16 @@
//Get a receiver
//NL: We could find a better receiver (load balancing
?)
+ Log.log("get a receiver");
+
if (listeners==0) break;
Iterator i=subscribers.values().iterator();
SpyDistributedConnection dc=null;
while (i.hasNext()) {
dc=(SpyDistributedConnection)i.next();
- if (dc.listeners!=0) break;
+ if (dc.listeners) break;
}
- if (dc==null||dc.listeners==0) {
+ if (dc==null||!dc.listeners) {
//listeners=0;
Log.error("WARNING: The listeners count was
invalid !");
break;
@@ -293,7 +295,7 @@
Log.error(e);
handleConnectionFailure(dc,null);
}
-
+
}
//Notify that it has finished its work : another thread can
start working on this queue
@@ -315,12 +317,16 @@
SpyDistributedConnection
distributedConnection=(SpyDistributedConnection)subscribers.get(dc.getClientID());
if (distributedConnection==null) throw new JMSException("This
DistributedConnection is not registered");
- if (mode) {
- distributedConnection.listeners++;
- listeners++;
+ if (mode) {
+ if (!distributedConnection.listeners) {
+ distributedConnection.listeners=true;
+ listeners++;
+ }
} else {
- distributedConnection.listeners--;
- listeners--;
+ if (distributedConnection.listeners) {
+ distributedConnection.listeners=false;
+ listeners--;
+ }
}
1.3 +4 -4 spyderMQ/src/java/org/spydermq/SpyConnection.java
Index: SpyConnection.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyConnection.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyConnection.java 2000/05/31 23:22:55 1.2
+++ SpyConnection.java 2000/06/01 00:04:40 1.3
@@ -29,7 +29,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyConnection
implements Connection, Serializable
@@ -171,9 +171,9 @@
//notify his sessions
synchronized (createdSessions) {
- Iterator i=createdSessions.iterator();
- while (i.hasNext()) {
- ((SpySession)i.next()).close();
+ Object[] vect=createdSessions.toArray();
+ for(int i=0;i<vect.length;i++) {
+ ((SpySession)vect[i]).close();
}
}
1.2 +2 -2 spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java
Index: SpyDistributedConnection.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyDistributedConnection.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyDistributedConnection.java 2000/05/31 18:06:43 1.1
+++ SpyDistributedConnection.java 2000/06/01 00:04:40 1.2
@@ -14,14 +14,14 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyDistributedConnection
implements Serializable
{
private String clientID;
public ConnectionReceiver cr;
- public transient int listeners;
+ public transient boolean listeners;
SpyDistributedConnection(String id,ConnectionReceiver cr_)
{