User: hiram
Date: 01/01/15 19:57:35
Modified: src/java/org/spydermq/server AbstractQueue.java
BasicQueue.java ClientConsumer.java
ExclusiveQueue.java JMSDestination.java
JMSServer.java PersistenceManager.java
SharedQueue.java
Removed: src/java/org/spydermq/server MessageEnvelope.java
NonPersistentMessageEnvelope.java
PersistentMessageEnvelope.java
Log:
Backed off the last set of chages meant to improve performance since they were
causing worse performance.
Revision Changes Path
1.3 +2 -4 spyderMQ/src/java/org/spydermq/server/AbstractQueue.java
Index: AbstractQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/AbstractQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- AbstractQueue.java 2001/01/10 13:57:44 1.2
+++ AbstractQueue.java 2001/01/16 03:57:32 1.3
@@ -15,13 +15,11 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public interface AbstractQueue {
-
- public void removeConsumer(ClientConsumer consumer) throws JMSException;
public void addConsumer(ClientConsumer consumer) throws JMSException;
-
- public void addMessage(MessageEnvelope mes, Long txId) throws JMSException;
+ public void addMessage(SpyMessage mes, Long txId) throws JMSException;
void notifyMessageAvailable();
+ public void removeConsumer(ClientConsumer consumer) throws JMSException;
}
1.3 +60 -62 spyderMQ/src/java/org/spydermq/server/BasicQueue.java
Index: BasicQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/BasicQueue.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- BasicQueue.java 2001/01/10 13:57:44 1.2
+++ BasicQueue.java 2001/01/16 03:57:32 1.3
@@ -28,7 +28,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
abstract public class BasicQueue implements Task, AbstractQueue {
@@ -47,7 +47,49 @@
this.server=server;
}
+
+ //Used to put a message that was added previously to the queue, back in the
queue
+ public void restoreMessage(SpyMessage mes)
+ {
+ //restore a message to the message list...
+ synchronized (messages) {
+ messages.add(mes);
+ }
+ notifyMessageAvailable();
+ }
+
+ public void addMessage(SpyMessage mes, Long txId) throws JMSException
+ {
+ Log.log(""+this+"->addMessage(mes="+mes+",txId="+txId+")");
+ // This task gets run to make the message visible in the queue.
+ class AddMessagePostCommitTask implements Runnable {
+ SpyMessage message;
+
+ AddMessagePostCommitTask(SpyMessage m) {
+ message = m;
+ }
+
+ public void run() {
+ //restore a message to the message list...
+ synchronized (messages) {
+ messages.add(message);
+ }
+ notifyMessageAvailable();
+ }
+ }
+
+ // The message gets added to the queue after the transaction
+ // commits (if the message was transacted)
+ Runnable task = new AddMessagePostCommitTask(mes);
+ if( txId == null ) {
+ task.run();
+ } else {
+ server.persistenceManager.addPostCommitTask(txId, task);
+ }
+
+ }
+
//
public void addConsumer(ClientConsumer consumer) throws JMSException
{
@@ -62,36 +104,29 @@
public SpyMessage[] browse(String selector) throws JMSException {
if( selector == null ) {
- MessageEnvelope list[];
+ SpyMessage list[];
synchronized (messages) {
- list = new MessageEnvelope[messages.size()];
- list = (MessageEnvelope [])messages.toArray(list);
+ list = new SpyMessage[messages.size()];
+ list = (SpyMessage [])messages.toArray(list);
}
-
- SpyMessage messageList[] = new SpyMessage[list.length];
- for( int i=0; i < list.length; i++ )
- messageList[i] = list[i].getMessage();
- return messageList;
-
+ return list;
} else {
Selector s = new Selector( selector );
LinkedList selection=new LinkedList();
-
- MessageEnvelope list[];
+
synchronized (messages) {
- list = new MessageEnvelope[messages.size()];
- list = (MessageEnvelope [])messages.toArray(list);
- }
-
- for( int i=0; i < list.length; i++ ) {
- SpyMessage m = list[i].getMessage();
- if( s.test(m) )
- selection.add(m);
+ Iterator i = messages.iterator();
+ while( i.hasNext() ) {
+ SpyMessage m = (SpyMessage)i.next();
+ if( s.test(m) )
+ selection.add(m);
+ }
}
-
- SpyMessage messageList[] = new SpyMessage[selection.size()];
- messageList = (SpyMessage [])selection.toArray(messageList);
- return messageList;
+
+ SpyMessage list[];
+ list = new SpyMessage[selection.size()];
+ list = (SpyMessage [])selection.toArray(list);
+ return list;
}
}
@@ -113,10 +148,10 @@
if (messages.size()==0)
return null;
- MessageEnvelope m = (MessageEnvelope)messages.first();
+ SpyMessage m = (SpyMessage)messages.first();
messages.remove(m);
- return m.getMessage();
+ return m;
}
}
@@ -129,41 +164,4 @@
}
}
- public void addMessage(MessageEnvelope mes, Long txId) throws JMSException
- {
- Log.log(""+this+"->addMessage(mes="+mes.messageId+",txId="+txId+")");
-
- // This task gets run to make the message visible in the queue.
- class AddMessagePostCommitTask implements Runnable {
- MessageEnvelope message;
-
- AddMessagePostCommitTask(MessageEnvelope m) {
- message = m;
- }
-
- public void run() {
- //restore a message to the message list...
- synchronized (messages) {
- messages.add(message);
- }
- notifyMessageAvailable();
- }
- }
-
- // The message gets added to the queue after the transaction
- // commits
- Runnable task = new AddMessagePostCommitTask( mes );
- server.persistenceManager.addPostCommitTask(txId, task);
-
- }
-
- //Used to put a message that was added previously to the queue, back in the
queue
- public void restoreMessage(MessageEnvelope mes)
- {
- //restore a message to the message list...
- synchronized (messages) {
- messages.add(mes);
- }
- notifyMessageAvailable();
- }
}
1.8 +7 -8 spyderMQ/src/java/org/spydermq/server/ClientConsumer.java
Index: ClientConsumer.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ClientConsumer.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- ClientConsumer.java 2001/01/10 13:57:43 1.7
+++ ClientConsumer.java 2001/01/16 03:57:32 1.8
@@ -27,7 +27,7 @@
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.7 $
+ * @version $Revision: 1.8 $
*/
public class ClientConsumer implements Task {
@@ -389,23 +389,22 @@
Iterator i = queue.messages.iterator();
while( i.hasNext() ) {
- MessageEnvelope envelope = (MessageEnvelope)i.next();
- SpyMessage headers = envelope.getHeadersMessage();
+ SpyMessage message = (SpyMessage)i.next();
- LinkedList l = (LinkedList)destinationSubscriptions.get(
headers.getJMSDestination() );
+ LinkedList l = (LinkedList)destinationSubscriptions.get(
message.getJMSDestination() );
if( l == null ) return false;
Iterator subs = l.iterator();
while( subs.hasNext() ) {
Subscription s = (Subscription)subs.next();
- if( s.accepts( headers, true ) ) {
+ if( s.accepts( message, true ) ) {
s.receiving = false;
i.remove();
ReceiveRequest r = new ReceiveRequest();
- r.message = envelope.getMessage();
+ r.message = message;
r.subscriptionId = new
Integer(s.subscriptionId);
synchronized (messages) {
@@ -413,13 +412,13 @@
}
AcknowledgementRequest ack = new
AcknowledgementRequest();
- ack.destination = headers.getJMSDestination();
- ack.messageID = headers.getJMSMessageID();
+ ack.destination = message.getJMSDestination();
+ ack.messageID = message.getJMSMessageID();
ack.subscriberId = s.subscriptionId;
ack.isAck = false;
synchronized (unacknowledgedMessages) {
- unacknowledgedMessages.put(ack,
r.message);
+ unacknowledgedMessages.put(ack,
message);
}
notifyMessageAvailable();
1.6 +6 -28 spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java
Index: ExclusiveQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/ExclusiveQueue.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- ExclusiveQueue.java 2001/01/10 13:57:43 1.5
+++ ExclusiveQueue.java 2001/01/16 03:57:33 1.6
@@ -24,19 +24,11 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class ExclusiveQueue extends BasicQueue {
- private static final long MSG_IDLE_TIME_ALLOWENCE = 1000*10; // 10 seconds
- private long lastActiveConsumerTS=0;
- // Constructor ---------------------------------------------------
- public ExclusiveQueue(JMSServer server) throws JMSException
- {
- super(server);
- }
-
// Iterate over the consumers asking them to take messages until they stop
// consuming.
public void run() throws JMSException
@@ -46,24 +38,7 @@
synchronized (messages) {
synchronized (consumers) {
-
- if( consumers.size() ==0 ) {
- if( (System.currentTimeMillis() -
lastActiveConsumerTS)
- > MSG_IDLE_TIME_ALLOWENCE ) {
-
- // No consumers.. move messages to
secondary storage
- // There have been no consumers for a
while.
- Iterator i = messages.iterator();
- while( i.hasNext() ) {
- MessageEnvelope me =
(MessageEnvelope)i.next();
- me.moveToSecondaryStorage();
- }
-
- }
-
- return;
- }
-
+
LinkedList consumersDone = new LinkedList();
while( consumers.size()!=0 && messages.size() != 0) {
@@ -86,8 +61,6 @@
while( consumersDone.size() != 0 ) {
consumers.addLast(consumersDone.removeFirst());
}
-
- lastActiveConsumerTS = System.currentTimeMillis();
}
}
@@ -98,4 +71,9 @@
return "ExclusiveQueue";
}
+ // Constructor ---------------------------------------------------
+ public ExclusiveQueue(JMSServer server) throws JMSException
+ {
+ super(server);
+ }
}
1.6 +10 -15 spyderMQ/src/java/org/spydermq/server/JMSDestination.java
Index: JMSDestination.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSDestination.java,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- JMSDestination.java 2001/01/10 13:57:43 1.5
+++ JMSDestination.java 2001/01/16 03:57:33 1.6
@@ -26,7 +26,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.5 $
+ * @version $Revision: 1.6 $
*/
public class JMSDestination {
@@ -84,7 +84,7 @@
if( isTopic ) {
- sharedQueue.addMessage(new MessageEnvelope(mes), txId);
+ sharedQueue.addMessage(mes, txId);
synchronized (exclusiveQueues) {
@@ -97,12 +97,10 @@
String queueId = (String)iter.next();
ExclusiveQueue eq =
(ExclusiveQueue)exclusiveQueues.get(queueId);
- if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT ) {
- java.io.File f =
server.persistenceManager.add(queueId, mes, txId);
- eq.addMessage(new
PersistentMessageEnvelope(mes,f), txId);
- } else {
- eq.addMessage(new
NonPersistentMessageEnvelope(mes,
server.persistenceManager.getPersistenceFileFor(mes,DEFAULT_QUEUE_ID)), txId);
- }
+ if( mes.getJMSDeliveryMode() ==
DeliveryMode.PERSISTENT )
+ server.persistenceManager.add(queueId,
mes, txId);
+
+ eq.addMessage(mes, txId);
}
}
@@ -110,13 +108,10 @@
} else {
ExclusiveQueue eq = (ExclusiveQueue)exclusiveQueues.get(
DEFAULT_QUEUE_ID );
- if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ) {
- java.io.File f =
server.persistenceManager.add(DEFAULT_QUEUE_ID, mes, txId);
- eq.addMessage(new PersistentMessageEnvelope(mes, f),
txId);
- } else {
- eq.addMessage(new NonPersistentMessageEnvelope(mes,
server.persistenceManager.getPersistenceFileFor(mes,DEFAULT_QUEUE_ID)), txId);
- }
-
+ if( mes.getJMSDeliveryMode() == DeliveryMode.PERSISTENT )
+ server.persistenceManager.add(DEFAULT_QUEUE_ID, mes,
txId);
+
+ eq.addMessage(mes, txId);
}
@@ -195,7 +190,7 @@
{
Log.log(""+this+"->restoreMessage(mes="+mes+",queue="+queueId+")");
ExclusiveQueue eq = getExclusiveQueue(queueId);
- eq.restoreMessage( new MessageEnvelope(mes) );
+ eq.restoreMessage(mes);
}
public String toString() {
1.13 +0 -2 spyderMQ/src/java/org/spydermq/server/JMSServer.java
Index: JMSServer.java
===================================================================
RCS file: /products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/JMSServer.java,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- JMSServer.java 2001/01/10 13:57:43 1.12
+++ JMSServer.java 2001/01/16 03:57:33 1.13
@@ -27,7 +27,7 @@
* @author Norbert Lataille ([EMAIL PROTECTED])
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.12 $
+ * @version $Revision: 1.13 $
*/
public class JMSServer
implements Runnable, JMSServerMBean
@@ -151,8 +151,6 @@
task.run();
} catch (JMSException e) {
Log.error(e);
- if( e.getLinkedException() != null )
- Log.error( e.getLinkedException() );
}
}
1.7 +26 -61 spyderMQ/src/java/org/spydermq/server/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/PersistenceManager.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- PersistenceManager.java 2001/01/10 13:57:43 1.6
+++ PersistenceManager.java 2001/01/16 03:57:33 1.7
@@ -16,19 +16,17 @@
import org.spydermq.xml.XElement;
import org.spydermq.persistence.SpyTxLog;
-import org.spydermq.persistence.SpyMessageQueue;
+import org.spydermq.persistence.SpyMessageLog;
import org.spydermq.SpyDestination;
import org.spydermq.SpyMessage;
import org.spydermq.SpyDistributedConnection;
-import java.io.File;
-
/**
* This class manages all persistence related services.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class PersistenceManager {
@@ -37,10 +35,10 @@
// The configuration data for the manager.
XElement configElement;
// The directory where persistence data should be stored
- File dataDirectory;
+ URL dataDirectory;
// Log file used to store commited transactions.
SpyTxLog spyTxLog;
- // Maps SpyDestinations to SpyMessageQueues
+ // Maps SpyDestinations to SpyMessageLogs
HashMap messageLogs = new HashMap();
// Maps (Long)txIds to LinkedList of Runnable tasks
HashMap postCommitTasks = new HashMap();
@@ -78,11 +76,11 @@
}
static class LogInfo {
- SpyMessageQueue log;
+ SpyMessageLog log;
SpyDestination destination;
String queueId;
- LogInfo(SpyMessageQueue log, SpyDestination destination, String
queueId) {
+ LogInfo(SpyMessageLog log, SpyDestination destination, String queueId)
{
this.log=log;
this.destination=destination;
this.queueId=queueId;
@@ -101,10 +99,9 @@
this.configElement = configElement;
URL configFile =
getClass().getClassLoader().getResource("spyderMQ.xml");
- dataDirectory = new File(new URL(configFile,
configElement.getField("DataDirectory")).getFile());
- dataDirectory.mkdirs();
- File txLogFile = new File(dataDirectory, "transactions.dat");
- spyTxLog = new SpyTxLog(txLogFile.getAbsolutePath());
+ dataDirectory = new URL(configFile,
configElement.getField("DataDirectory"));
+ URL txLogFile = new URL(dataDirectory, "transactions.dat");
+ spyTxLog = new SpyTxLog(txLogFile.getFile());
} catch (Exception e) {
javax.jms.JMSException newE = new
javax.jms.JMSException("Invalid configuration.");
@@ -115,8 +112,21 @@
}
+ public void add(String queueId, org.spydermq.SpyMessage message, Long txId)
throws javax.jms.JMSException {
+ LogInfo logInfo;
+ synchronized (messageLogs) {
+ logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
+
+ logInfo.log.add(message, txId);
+
+ }
+
public void addPostCommitTask(Long txId, Runnable task) throws
javax.jms.JMSException {
@@ -289,9 +299,8 @@
try {
- File logFile = new File(dataDirectory,
dest.toString()+"-"+queueId+".dat");
- File messageDirectory = new File(dataDirectory,
dest.toString()+"-"+queueId+"-messages");
- SpyMessageQueue log = new SpyMessageQueue(this,
logFile.getAbsolutePath(), messageDirectory.getAbsolutePath());
+ URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
+ SpyMessageLog log = new SpyMessageLog(logFile.getFile());
LogInfo info = new LogInfo(log, dest, queueId);
@@ -311,9 +320,10 @@
try {
- File file = new File(dataDirectory,
dest.toString()+"-"+queueId+".dat");
+ URL logFile = new URL(dataDirectory,
dest.toString()+"-"+queueId+".dat");
+ java.io.File file = new java.io.File(logFile.getFile());
- SpyMessageQueue log =
(SpyMessageQueue)messageLogs.remove(""+dest+"-"+queueId);
+ SpyMessageLog log =
(SpyMessageLog)messageLogs.remove(""+dest+"-"+queueId);
if( log == null )
throw new JMSException("The persistence log was never
initialized");
log.close();
@@ -327,51 +337,6 @@
newE.setLinkedException(e);
throw newE;
}
-
- }
-
- public java.io.File add(String queueId, org.spydermq.SpyMessage message, Long
txId) throws javax.jms.JMSException {
-
- LogInfo logInfo;
-
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+message.getJMSDestination()+"-"+queueId);
- }
-
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
-
- return logInfo.log.add(message, txId);
-
- }
-
- public File getPersistenceFileFor(SpyMessage mes, String queueId) throws
javax.jms.JMSException {
-
- LogInfo logInfo;
-
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+mes.getJMSDestination()+"-"+queueId);
- }
-
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
-
- return logInfo.log.messageIdToFile(mes.messageId);
-
- }
-
- public File getSpyMessageQueue(SpyMessage mes, String queueId) throws
javax.jms.JMSException {
-
- LogInfo logInfo;
-
- synchronized (messageLogs) {
- logInfo = (LogInfo)
messageLogs.get(""+mes.getJMSDestination()+"-"+queueId);
- }
-
- if (logInfo == null)
- throw new javax.jms.JMSException("Destination was not
initalized with the PersistenceManager");
-
- return logInfo.log.messageIdToFile(mes.messageId);
}
}
1.5 +5 -5 spyderMQ/src/java/org/spydermq/server/SharedQueue.java
Index: SharedQueue.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/server/SharedQueue.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- SharedQueue.java 2001/01/10 13:57:43 1.4
+++ SharedQueue.java 2001/01/16 03:57:34 1.5
@@ -23,7 +23,7 @@
* @author Hiram Chirino ([EMAIL PROTECTED])
* @author Norbert Lataille ([EMAIL PROTECTED])
*
- * @version $Revision: 1.4 $
+ * @version $Revision: 1.5 $
*/
public class SharedQueue extends BasicQueue {
@@ -38,7 +38,7 @@
public void run() throws JMSException
{
Log.log(""+this+"->run()");
- MessageEnvelope[] envelopes;
+ SpyMessage[] job;
synchronized (messages) {
if( messages.size() == 0 ) {
@@ -46,8 +46,8 @@
return;
}
- envelopes=new MessageEnvelope[messages.size()];
- envelopes=(MessageEnvelope[])messages.toArray(envelopes);
+ job=new SpyMessage[messages.size()];
+ job=(SpyMessage[])messages.toArray(job);
messages.clear();
}
@@ -61,8 +61,8 @@
ClientConsumer consumer = (ClientConsumer)iter.next();
- for( int i=0 ; i < envelopes.length; i++ )
- consumer.addMessage(envelopes[i].getMessage());
+ for( int i=0 ; i < job.length; i++ )
+ consumer.addMessage(job[i]);
consumer.notifyMessageAvailable();