User: hiram
Date: 01/01/10 05:57:46
Modified: src/java/org/spydermq/persistence SpyTxLog.java
SpyMessageLog.java SpyMessageLogTester.java
Added: src/java/org/spydermq/persistence SpyMessageQueue.java
Log:
Feature Add: Faster recovery after a spyderMQ server failure
Feature Add: Better server scalability by moving message not in the working set to
secondary storage.
Revision Changes Path
1.2 +56 -9 spyderMQ/src/java/org/spydermq/persistence/SpyTxLog.java
Index: SpyTxLog.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/persistence/SpyTxLog.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- SpyTxLog.java 2000/12/16 03:27:49 1.1
+++ SpyTxLog.java 2001/01/10 13:57:45 1.2
@@ -15,7 +15,7 @@
* This is used to keep a log of commited transactions.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.1 $
+ * @version $Revision: 1.2 $
*/
public class SpyTxLog {
@@ -24,6 +24,18 @@
/////////////////////////////////////////////////////////////////////
private ObjectIntegrityLog transactionLog;
private long nextTransactionId = Long.MIN_VALUE;
+ private Record record = new Record();
+
+ private final static byte TX_COMMITED = 1;
+ private final static byte TX_CREATED = 0;
+ private final static byte TX_PREPARED = 3;
+ private final static byte TX_ROLLEDBACK = 2;
+
+ static class Record implements Serializable {
+ public static final long serialVersionUID = 1;
+ byte type;
+ long txId;
+ }
/////////////////////////////////////////////////////////////////////
// Constructors
@@ -50,8 +62,12 @@
synchronized public void commitTx(Long id) throws JMSException {
try {
- transactionLog.add(id);
+
+ record.type = TX_COMMITED;
+ record.txId = id.longValue();
+ transactionLog.add(record);
transactionLog.commit();
+
} catch ( IOException e ) {
throwJMSException("Could not create a new transaction.",e);
}
@@ -59,32 +75,63 @@
}
synchronized public Long createTx() throws JMSException {
- return new Long(nextTransactionId++);
+
+ Long newId = new Long(nextTransactionId++);
+
+ try {
+
+ record.type = TX_CREATED;
+ record.txId = newId.longValue();
+ transactionLog.add(record);
+ transactionLog.commit();
+
+ } catch ( IOException e ) {
+ throwJMSException("Could not create a new transaction.",e);
+ }
+
+ return newId;
}
synchronized public java.util.TreeSet restore() throws JMSException {
- java.util.TreeSet items=null;
+ java.util.Vector items=null;
try {
- items = transactionLog.toTreeSet();
+ items = transactionLog.toVector();
} catch ( Exception e ) {
throwJMSException("Could not restore the transaction log.",e);
- }
+ }
+ java.util.TreeSet commitedTxs = new java.util.TreeSet();
long maxId = Long.MIN_VALUE;
java.util.Iterator iter = items.iterator();
while( iter.hasNext() ) {
- Long l = (Long)iter.next();
- if( l.longValue() > maxId )
- maxId = l.longValue();
+
+ Record r = (Record)iter.next();
+ if( r.txId > maxId )
+ maxId = r.txId;
+
+ if( r.type == TX_COMMITED )
+ commitedTxs.add( new Long( maxId ) );
+
}
nextTransactionId = maxId+1;
- return items;
+ return commitedTxs;
}
synchronized public void rollbackTx(Long txId) throws JMSException {
+
+ try {
+
+ record.type = TX_ROLLEDBACK;
+ record.txId = txId.longValue();
+ transactionLog.add(record);
+ transactionLog.commit();
+
+ } catch ( IOException e ) {
+ throwJMSException("Could not create a new transaction.",e);
+ }
}
1.3 +38 -49 spyderMQ/src/java/org/spydermq/persistence/SpyMessageLog.java
Index: SpyMessageLog.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/persistence/SpyMessageLog.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyMessageLog.java 2000/12/16 03:27:48 1.2
+++ SpyMessageLog.java 2001/01/10 13:57:46 1.3
@@ -19,7 +19,7 @@
* provider failure. Integrety is kept by the use of an ObjectIntegrityLog.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyMessageLog {
@@ -27,26 +27,23 @@
// Attributes
/////////////////////////////////////////////////////////////////////
private ObjectIntegrityLog transactionLog;
- private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
- private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
+ private Record record = new Record();
+ // Record Types:
+ private static final byte MESSAGE_ADD_RECORD=0;
+ private static final byte MESSAGE_REMOVE_RECORD=1;
+
/////////////////////////////////////////////////////////////////////
// Helper Inner Classes
/////////////////////////////////////////////////////////////////////
- static class MessageAddedRecord implements Serializable {
+ static class Record implements Serializable {
+ public static final long serialVersionUID = 1;
+ byte type;
long messageId;
boolean isTransacted;
long transactionId;
- SpyMessage message;
}
-
- static class MessageRemovedRecord implements Serializable {
- boolean isTransacted;
- long transactionId;
- long messageId;
- }
-
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
@@ -58,7 +55,6 @@
}
}
-
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
@@ -73,16 +69,16 @@
synchronized public void add( SpyMessage message, Long transactionId ) throws
JMSException {
try{
- messageAddedRecord.message = message;
- messageAddedRecord.messageId = message.messageId;
+ record.type = MESSAGE_ADD_RECORD;
+ record.messageId = message.messageId;
if( transactionId == null ) {
- messageAddedRecord.isTransacted = false;
+ record.isTransacted = false;
} else {
- messageAddedRecord.isTransacted = true;
- messageAddedRecord.transactionId =
transactionId.longValue();
+ record.isTransacted = true;
+ record.transactionId = transactionId.longValue();
}
- transactionLog.add(messageAddedRecord);
+ transactionLog.add(record);
transactionLog.commit();
} catch ( IOException e ) {
@@ -94,14 +90,15 @@
synchronized public void remove( SpyMessage message, Long transactionId )
throws JMSException {
try{
- messageRemovedRecord.messageId = message.messageId;
+ record.type = MESSAGE_REMOVE_RECORD;
+ record.messageId = message.messageId;
if( transactionId == null ) {
- messageRemovedRecord.isTransacted = false;
+ record.isTransacted = false;
} else {
- messageRemovedRecord.isTransacted = true;
- messageRemovedRecord.transactionId =
transactionId.longValue();
+ record.isTransacted = true;
+ record.transactionId = transactionId.longValue();
}
- transactionLog.add(messageRemovedRecord);
+ transactionLog.add(record);
transactionLog.commit();
} catch ( IOException e ) {
@@ -109,37 +106,40 @@
}
}
+
- synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws
JMSException {
+
+ private void throwJMSException(String message, Exception e) throws
JMSException {
+ JMSException newE = new JMSException(message);
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ synchronized public Long[] restore(java.util.TreeSet commited) throws
JMSException {
- java.util.HashMap messageIndex = new java.util.HashMap();
+ java.util.Vector messageIndex = new java.util.Vector();
try {
ObjectIntegrityLog.IndexItem objects[] =
transactionLog.toIndex();
for( int i=0; i < objects.length; i++ ) {
- Object o = objects[i].record;
- if( o instanceof MessageAddedRecord ) {
+ Record r = (Record)objects[i].record;
+ if( r.type == MESSAGE_ADD_RECORD ) {
- MessageAddedRecord r = (MessageAddedRecord)o;
- r.message.messageId = r.messageId;
-
if( r.isTransacted && !commited.contains(new
Long(r.transactionId)) ) {
// the TX this message was part of was
not
// commited... so drop this message
continue;
}
- messageIndex.put( new Long(r.messageId),
objects[i]);
+ messageIndex.add( new Long(r.messageId) );
- } else if( o instanceof MessageRemovedRecord ) {
+ } else if( r.type == MESSAGE_REMOVE_RECORD ) {
- MessageRemovedRecord r =
(MessageRemovedRecord)o;
-
if( r.isTransacted && !commited.contains(new
Long(r.transactionId)) ) {
// the TX this message was part of was
not
- // commited... so drop this message
+ // commited... so dont remove this
message...
continue;
}
@@ -152,19 +152,8 @@
throwJMSException("Could not rebuild the queue from the
queue's tranaction log.",e);
}
- SpyMessage rc[] = new SpyMessage[messageIndex.size()];
- java.util.Iterator iter = messageIndex.values().iterator();
- for( int i=0; iter.hasNext(); i++ ) {
- ObjectIntegrityLog.IndexItem item =
(ObjectIntegrityLog.IndexItem)iter.next();
- rc[i] = ((MessageAddedRecord)item.record).message;
- }
+ Long rc[] = new Long[messageIndex.size()];
+ rc = (Long[])messageIndex.toArray(rc);
return rc;
- }
-
- private void throwJMSException(String message, Exception e) throws
JMSException {
- JMSException newE = new JMSException(message);
- newE.setLinkedException(e);
- throw newE;
}
-
}
1.3 +20 -8
spyderMQ/src/java/org/spydermq/persistence/SpyMessageLogTester.java
Index: SpyMessageLogTester.java
===================================================================
RCS file:
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/persistence/SpyMessageLogTester.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- SpyMessageLogTester.java 2000/12/16 03:27:48 1.2
+++ SpyMessageLogTester.java 2001/01/10 13:57:46 1.3
@@ -9,11 +9,11 @@
import org.spydermq.*;
/**
- * This class was used to perform unit testing on the SpyMessageLog/SpyTxLog
+ * This class was used to perform unit testing on the SpyMessageQueue/SpyTxLog
*
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @version $Revision: 1.3 $
*/
public class SpyMessageLogTester {
@@ -23,8 +23,8 @@
*/
public static void main(java.lang.String[] args) throws Exception {
- SpyTxLog tm = new SpyTxLog("SpyTxManager1.dat");
- SpyMessageLog log = new SpyMessageLog("SpyMessageLog1.dat");
+ SpyTxLog tm = new SpyTxLog("SpyTxManager4.dat");
+ SpyMessageQueue log = new SpyMessageQueue(null, "SpyMessageLog4.dat",
"SpyMessageLogDir4");
try{
@@ -58,14 +58,26 @@
add(log, second+1, null);
System.exit(0);
+
+ } catch ( Exception e ) {
+ e.printStackTrace();
+ if( e instanceof javax.jms.JMSException &&
((javax.jms.JMSException)e).getLinkedException()!=null ) {
+
((javax.jms.JMSException)e).getLinkedException().printStackTrace();
+ }
+
} finally {
log.close();
+ tm.close();
}
}
+
+
+
+
- public static void add(SpyMessageLog log, long messageId, Long txid) throws
Exception {
+ public static void add(SpyMessageQueue log, long messageId, Long txid) throws
Exception {
SpyTextMessage m = new SpyTextMessage();
m.messageId = messageId;
@@ -73,9 +85,9 @@
System.out.println("Adding message: "+m+",tx="+txid);
log.add(m,txid);
- }
-
- public static void remove(SpyMessageLog log, long messageId, Long txid) throws
Exception {
+ }
+
+ public static void remove(SpyMessageQueue log, long messageId, Long txid)
throws Exception {
SpyTextMessage m = new SpyTextMessage();
m.messageId = messageId;
1.1 spyderMQ/src/java/org/spydermq/persistence/SpyMessageQueue.java
Index: SpyMessageQueue.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.persistence;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.BufferedInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectInputStream;
import javax.jms.JMSException;
import org.spydermq.SpyMessage;
import org.spydermq.server.PersistenceManager;
import org.spydermq.Log;
/**
* This is used to persist SpyMessages arriving and leaving
* a queue. It uses a log that can be used reconstruct the queue
* in case of provider failure. Messages are stored in
* a given directory. A SpyMessageLog is used maintain
* the list of (transactionalyt) valid messages in the queue.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class SpyMessageQueue {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
File messageDirectory;
SpyMessageLog messageLog;
PersistenceManager persistenceManager;
static class DeleteFileTask implements Runnable {
DeleteFileTask(File f) {
file = f;
}
File file;
public void run() {
if( !file.delete() )
Log.notice("Could not remove the file: "+file);
}
}
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public SpyMessageQueue(PersistenceManager pm, String fileName, String
messageDir) throws JMSException {
persistenceManager = pm;
messageLog = new SpyMessageLog( fileName );
messageDirectory = new File( messageDir );
if( !messageDirectory.exists() ) {
messageDirectory.mkdirs();
messageDirectory.mkdir();
}
if( !messageDirectory.isDirectory() ) {
throwJMSException("Invalid message directory:
"+messageDirectory, null);
}
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
synchronized public void close() throws JMSException {
messageLog.close();
}
synchronized public void remove( SpyMessage message, Long transactionId )
throws JMSException {
messageLog.remove(message, transactionId);
File f = messageIdToFile(message.messageId);
if( persistenceManager == null || transactionId == null ) {
if( !f.delete() )
Log.notice("Could not remove the file: "+f);
} else {
persistenceManager.addPostCommitTask(transactionId, new
DeleteFileTask(f) );
}
}
synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws
JMSException {
Long messageIndex[] = messageLog.restore( commited );
java.util.HashSet files = new java.util.HashSet();
// Get a listing of all the files in the directory
{
File t[] = messageDirectory.listFiles();
for( int i=0; i < t.length; i++ )
files.add( t[i] );
}
// Read in all the messages
SpyMessage messages[] = new SpyMessage[ messageIndex.length ];
for( int i=0; i < messageIndex.length; i++ ) {
File file = messageIdToFile( messageIndex[i].longValue() );
files.remove(file);
try {
ObjectInputStream is = new ObjectInputStream(
new BufferedInputStream(
new FileInputStream( file ) ) );
messages[i] = (SpyMessage)is.readObject();
messages[i].messageId = messageIndex[i].longValue();
is.close();
} catch (Exception e ) {
throwJMSException("Could not restore a persisted
message", e);
}
}
// All the files left in the listing are messages that should
// be deleted
java.util.Iterator iter = files.iterator();
while( iter.hasNext() ) {
File f = (File)iter.next();
if( !f.delete() )
Log.notice("Could not remove the file: "+f);
}
return messages;
}
private void throwJMSException(String message, Exception e) throws
JMSException {
JMSException newE = new JMSException(message);
newE.setLinkedException(e);
throw newE;
}
synchronized public File messageIdToFile( long messageId ) throws JMSException
{
return new File(messageDirectory, "message_"+messageId+".dat");
}
synchronized public File add( SpyMessage message, Long transactionId ) throws
JMSException {
File f = messageIdToFile(message.messageId);
try {
ObjectOutputStream os = new ObjectOutputStream(
new BufferedOutputStream(
new FileOutputStream(f) ) );
os.writeObject(message);
os.close();
if( persistenceManager != null && transactionId != null ) {
persistenceManager.addPostRollbackTask(transactionId,
new DeleteFileTask(f) );
}
} catch ( IOException e ) {
throwJMSException("Could not persist the message", e);
}
messageLog.add( message, transactionId );
return f;
}
/**
* Insert the method's description here.
* Creation date: (1/10/01 7:27:11 AM)
* @return java.io.File
*/
public java.io.File getMessageDirectory() {
return messageDirectory;
}
}