User: hiram
Date: 00/11/13 22:15:54
Modified: src/java/org/spydermq SpyQueueSession.java SpySession.java
SpyTopicSession.java
Log:
We can now disable client side persistence. All actions
which require persistence are are synchronized with the
server for it to do the persistence.
Only the QueueSessions have is option turned on for now.
Revision Changes Path
1.4 +27 -9 spyderMQ/src/java/org/spydermq/SpyQueueSession.java
Index: SpyQueueSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyQueueSession.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- SpyQueueSession.java 2000/06/14 19:16:51 1.3
+++ SpyQueueSession.java 2000/11/14 06:15:53 1.4
@@ -18,12 +18,12 @@
import java.util.HashMap;
import java.util.Iterator;
-/**
+import javax.jms.DeliveryMode;/**
* This class implements javax.jms.QueueSession
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.3 $
+ * @version $Revision: 1.4 $
*/
public class SpyQueueSession
extends SpySession
@@ -35,11 +35,12 @@
SpyQueueSession(SpyConnection myConnection, boolean transacted, int
acknowledgeMode, boolean stop)
{
super(myConnection,transacted,acknowledgeMode,stop);
+ clientPersistence = false;
}
// Public --------------------------------------------------------
- public QueueBrowser createBrowser(Queue queue) throws JMSException
+ public QueueBrowser createBrowser(Queue queue) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -62,7 +63,7 @@
return ((SpyQueueConnection)connection).createQueue(queueName);
}
- public QueueReceiver createReceiver(Queue queue) throws JMSException
+ public QueueReceiver createReceiver(Queue queue) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -72,7 +73,7 @@
return receiver;
}
- public QueueReceiver createReceiver(Queue queue, String messageSelector) throws
JMSException
+ public QueueReceiver createReceiver(Queue queue, String messageSelector)
throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -80,13 +81,13 @@
return createReceiver(queue);
}
- public QueueSender createSender(Queue queue) throws JMSException
+ public QueueSender createSender(Queue queue) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyQueueSender(this,queue);
}
-
+
public TemporaryQueue createTemporaryQueue() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -108,8 +109,25 @@
//called by a MessageProducer object which needs to send a message
void sendMessage(SpyMessage m) throws JMSException
{
- if (closed) throw new IllegalStateException("The session is closed");
-
+ if (closed) throw new IllegalStateException("The session is closed");
+
+ // If client is not doing persistence then we have to make sure the
server
+ // gets the persistent message before we return. (This is done in the
commit for
+ // transacted sessions.)
+ if( !clientPersistence && transacted &&
m.getJMSDeliveryMode()==DeliveryMode.PERSISTENT) {
+ //Wait for the sending thread to sleep
+ synchronized (mutex) {
+ mutex.waitToSleep();
+
+ SpyMessage job[] = { m };
+ connection.sendToServer( job );
+
+ //We have finished our work, we can wake up the thread
+ mutex.notifyLock();
+ }
+ return;
+ }
+
//Synchronize with the outgoingQueue
synchronized (outgoingQueue)
{
1.11 +29 -8 spyderMQ/src/java/org/spydermq/SpySession.java
Index: SpySession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpySession.java,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -r1.10 -r1.11
--- SpySession.java 2000/11/12 00:30:15 1.10
+++ SpySession.java 2000/11/14 06:15:53 1.11
@@ -29,7 +29,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.10 $
+ * @version $Revision: 1.11 $
*/
public class SpySession
implements Runnable, Session
@@ -59,7 +59,9 @@
public Mutex mutex;
//Is this session in alpha mode ?
public boolean alphaMode;
-
+ // Should we do client side persistence?
+ public boolean clientPersistence = true;
+
// Constructor ---------------------------------------------------
SpySession(SpyConnection conn, boolean trans, int acknowledge, boolean stop)
@@ -261,7 +263,7 @@
// allow other threads to process before closing this session
// Patch submitted by John Ellis (10/29/00)
Thread.yield();
-
+
if (closed) return;
closed=true;
@@ -300,9 +302,28 @@
modeStop=true;
//Wait for the thread to sleep
- synchronized (mutex) {
-
+ synchronized (mutex) {
mutex.waitToSleep();
+
+ // If we are not doing client side persistence, then we have
to send the
+ // persistent messages to server and confirm that they have
been received before we return.
+ if( !clientPersistence ) {
+
+ LinkedList persistentMessages = new LinkedList();
+ java.util.ListIterator iter =
outgoingQueue.listIterator();
+ while( iter.hasNext() ) {
+ SpyMessage sm = (SpyMessage)iter.next();
+ if( sm.getJMSDeliveryMode() ==
javax.jms.DeliveryMode.PERSISTENT ) {
+ persistentMessages.addLast(sm);
+ iter.remove();
+ }
+ }
+ if (persistentMessages.size()!=0) {
+ SpyMessage job[]=new
SpyMessage[persistentMessages.size()];
+
job=(SpyMessage[])persistentMessages.toArray(job);
+ connection.sendToServer(job);
+ }
+ }
//Move the outgoing messages from the outgoingQueue to the
outgoingCommitedQueue
outgoingCommitedQueue.addAll(outgoingQueue);
@@ -316,6 +337,7 @@
sessionQueue.commit();
}
+
//We have finished our work, we can wake up the thread
modeStop=modeSav;
mutex.notifyLock();
@@ -460,7 +482,7 @@
//The connection has changed its mode (stop() or start())
//We have to wait until message delivery has stopped or wake up the thread
void notifyStopMode(boolean newValue)
- {
+ {
if (closed) throw new IllegalStateException("The session is closed");
if (modeStop==newValue) return;
@@ -480,6 +502,5 @@
}
}
-
-
+
}
1.6 +27 -10 spyderMQ/src/java/org/spydermq/SpyTopicSession.java
Index: SpyTopicSession.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/SpyTopicSession.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- SpyTopicSession.java 2000/07/07 22:37:21 1.5
+++ SpyTopicSession.java 2000/11/14 06:15:54 1.6
@@ -28,7 +28,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class SpyTopicSession
extends SpySession
@@ -44,21 +44,21 @@
// Public --------------------------------------------------------
- public Topic createTopic(String topicName) throws JMSException
+ public Topic createTopic(String topicName) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return ((SpyTopicConnection)connection).createTopic(topicName);
}
- public TopicSubscriber createSubscriber(Topic topic) throws JMSException
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return createSubscriber(topic,null,false);
}
- public TopicSubscriber createSubscriber(Topic topic, String messageSelector,
boolean noLocal) throws JMSException
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector,
boolean noLocal) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -73,7 +73,7 @@
return sub;
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws
JMSException
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name)
throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -81,7 +81,7 @@
return createSubscriber(topic);
}
- public TopicSubscriber createDurableSubscriber(Topic topic, String name, String
messageSelector, boolean noLocal) throws JMSException
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name,
String messageSelector, boolean noLocal) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
@@ -89,20 +89,20 @@
return createSubscriber(topic);
}
- public TopicPublisher createPublisher(Topic topic) throws JMSException
+ public TopicPublisher createPublisher(Topic topic) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return new SpyTopicPublisher(this,topic);
}
-
+
public TemporaryTopic createTemporaryTopic() throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
return ((SpyTopicConnection)connection).getTemporaryTopic();
}
- public void unsubscribe(String name) throws JMSException
+ public void unsubscribe(String name) throws JMSException
{
//Not yet implemented
}
@@ -138,6 +138,23 @@
void sendMessage(SpyMessage m) throws JMSException
{
if (closed) throw new IllegalStateException("The session is closed");
+
+ // If client is not doing persistence then we have to make sure the
server
+ // gets the persistent message before we return. (This is done in the
commit for
+ // transacted sessions.)
+ if( !clientPersistence && transacted &&
m.getJMSDeliveryMode()==javax.jms.DeliveryMode.PERSISTENT) {
+ //Wait for the sending thread to sleep
+ synchronized (mutex) {
+ mutex.waitToSleep();
+
+ SpyMessage job[] = { m };
+ connection.sendToServer( job );
+
+ //We have finished our work, we can wake up the thread
+ mutex.notifyLock();
+ }
+ return;
+ }
//Synchronize with the outgoingQueue
synchronized (outgoingQueue)
@@ -172,7 +189,7 @@
//Handle persistence
//First shot : use a fs based persistence system
try {
- if (m.persistent) {
+ if (m.persistent && clientPersistence) {
//Log.log("ADD file "+m.getJMSMessageID());
if (m.removed==false) {
ObjectOutputStream output=new
ObjectOutputStream(new FileOutputStream(m.getJMSMessageID()));