User: hiram
Date: 00/11/22 09:43:33
Modified: src/java/org/spydermq JMSServerQueueReceiver.java
JMSServerQueue.java JMSServer.java
Log:
Added a crude persistence scheme to support persistent messages.
It's Transactional behavior still needs some work.
Revision Changes Path
1.2 +12 -5 spyderMQ/src/java/org/spydermq/JMSServerQueueReceiver.java
Index: JMSServerQueueReceiver.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueueReceiver.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- JMSServerQueueReceiver.java 2000/11/19 19:59:57 1.1
+++ JMSServerQueueReceiver.java 2000/11/22 17:43:33 1.2
@@ -11,6 +11,7 @@
import java.io.Serializable;
import org.spydermq.distributed.interfaces.ConnectionReceiver;
import org.spydermq.distributed.interfaces.ConnectionReceiverSetup;
+import javax.jms.DeliveryMode;
/**
* This class manages a connection receiver for a JMSServerQueue.
@@ -18,7 +19,7 @@
*
*@author Hiram Chirino ([EMAIL PROTECTED])
*
- *@version $Revision: 1.1 $
+ *@version $Revision: 1.2 $
*/
public class JMSServerQueueReceiver implements Serializable {
@@ -47,7 +48,7 @@
}
- void acknowledge(String messageId, boolean ack) {
+ void acknowledge(String messageId, boolean ack) throws javax.jms.JMSException {
SpyMessage m;
synchronized (unacknowledgedMessages) {
m = (SpyMessage) unacknowledgedMessages.remove(messageId);
@@ -55,18 +56,24 @@
if (m == null)
return;
-
- if (!jmsSeverQueue.isTopic) {
+
+ if (jmsSeverQueue.isTopic) {
// Not sure how we should handle the topic case.
// On a negative acknowledge, we don't want to
// add it back to the topic since other
// receivers might get a duplicate duplicate message.
- } else {
+ } else {
// Was it a negative acknowledge??
if (!ack) {
Log.log("Restoring message: " + m.messageId);
jmsSeverQueue.restoreMessage(m);
} else {
+
+ if( m.getJMSDeliveryMode() == DeliveryMode.PERSISTENT
) {
+
jmsSeverQueue.spyMessageLog.logRemoveMessage(m);
+ jmsSeverQueue.spyMessageLog.commit();
+ }
+
Log.log("Message Ack: " + m.messageId);
}
}
1.18 +43 -10 spyderMQ/src/java/org/spydermq/JMSServerQueue.java
Index: JMSServerQueue.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServerQueue.java,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- JMSServerQueue.java 2000/11/19 19:59:56 1.17
+++ JMSServerQueue.java 2000/11/22 17:43:33 1.18
@@ -13,13 +13,15 @@
import java.util.LinkedList;
import java.util.HashMap;
import java.util.TreeSet;
+import javax.jms.DeliveryMode;
+import org.spydermq.persistence.SpyMessageLog;
/**
*This class is a message queue which is stored (hashed by Destination) on the JMS
provider
*
*@author Norbert Lataille ([EMAIL PROTECTED])
*
- *@version $Revision: 1.17 $
+ *@version $Revision: 1.18 $
*/
public class JMSServerQueue {
// Attributes ----------------------------------------------------
@@ -52,9 +54,12 @@
// Should we use the round robin aproach to pick the next reciver of a p2p
message?
private boolean useRoundRobinMessageDistribution = true;
+ // Used to log the persistent messages.
+ SpyMessageLog spyMessageLog;
+
// Constructor ---------------------------------------------------
- JMSServerQueue(SpyDestination dest,SpyDistributedConnection
temporary,JMSServer server)
+ JMSServerQueue(SpyDestination dest,SpyDistributedConnection
temporary,JMSServer server) throws JMSException
{
destination=dest;
subscribers=new HashMap();
@@ -65,6 +70,15 @@
this.server=server;
isTopic=dest instanceof SpyTopic;
listeners=0;
+
+ spyMessageLog = new SpyMessageLog( dest.name+"-transaction-log.dat" );
+ SpyMessage[] rebuild = spyMessageLog.rebuildMessagesFromLog();
+ for( int i=0; i < rebuild.length; i++ ) {
+ restoreMessage( rebuild[i] );
+ messageIdCounter = Math.max( messageIdCounter,
rebuild[i].messageId+1 );
+ }
+ Log.notice("Restored "+rebuild.length+" messages to "+dest.name+" from
the transaction log");
+
}
@@ -106,7 +120,7 @@
- public void addMessage(SpyMessage mes)
+ public void addMessage(SpyMessage mes) throws JMSException
{
//Add a message to the message list...
synchronized (messages)
@@ -115,7 +129,13 @@
//messages is now an ordered tree. The order depends
//first on the priority and then on messageId
mes.messageId = messageIdCounter++;
- messages.add(mes);
+
+ if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
+ spyMessageLog.logAddMessage(mes);
+ spyMessageLog.commit();
+ }
+
+ messages.add(mes);
if (isTopic) {
//if a thread is already working on this destination,
I don't have to myself to the taskqueue
@@ -268,10 +288,20 @@
void doMyJob() throws JMSException
{
if (isTopic) {
-
+
//Clear the message queue
SpyMessage[] msgs=startWork();
-
+
+ boolean msgRemoved=false;
+ for( int i=0; i < msgs.length; i++ ) {
+ if( msgs[i].getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT ) {
+ spyMessageLog.logRemoveMessage(msgs[i]);
+ msgRemoved=true;
+ }
+ }
+ if( msgRemoved )
+ spyMessageLog.commit();
+
//Let the thread do its work
if (msgs.length == 1) {
@@ -308,8 +338,13 @@
break;
}
- if (mes.isOutdated())
+ if (mes.isOutdated()) {
+ if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT ) {
+
spyMessageLog.logRemoveMessage(mes);
+ spyMessageLog.commit();
+ }
continue;
+ }
// we may have to restore the
lastUsedQueueReceiver
// if message on the queue is not sent. (we
don't want to skip
@@ -522,8 +557,6 @@
}
}
- }
-
-
+ }
}
1.10 +3 -3 spyderMQ/src/java/org/spydermq/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/JMSServer.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- JMSServer.java 2000/11/19 19:59:56 1.9
+++ JMSServer.java 2000/11/22 17:43:33 1.10
@@ -22,7 +22,7 @@
*
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
@@ -268,7 +268,7 @@
return newQueue;
}
- public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection
dc)
+ public synchronized TemporaryTopic getTemporaryTopic(SpyDistributedConnection
dc) throws JMSException
{
SpyTemporaryTopic topic=new SpyTemporaryTopic("JMS_TT"+(new
Integer(lastTemporaryTopic++).toString()),dc);
@@ -282,7 +282,7 @@
return topic;
}
- public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection
dc)
+ public synchronized TemporaryQueue getTemporaryQueue(SpyDistributedConnection
dc) throws JMSException
{
SpyTemporaryQueue newQueue=new SpyTemporaryQueue("JMS_TQ"+(new
Integer(lastTemporaryQueue++).toString()),dc);