User: hiram
Date: 00/11/22 09:43:33
Added: src/java/org/spydermq/persistence SpyMessageLog.java
SpyMessageLogTester.java TransactionLog.java
Log:
Added a crude persistence scheme to support persistent messages.
It's Transactional behavior still needs some work.
Revision Changes Path
1.1 spyderMQ/src/java/org/spydermq/persistence/SpyMessageLog.java
Index: SpyMessageLog.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.persistence;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import org.spydermq.SpyMessage;
import java.io.Serializable;
import java.util.HashMap;
import javax.jms.JMSException;
/**
* This is used to keep a log of SpyMessages arriving and leaving
* a queue. The log can be used reconstruct the queue in case of
* provider failure. Transactional integrety is kept by the use
* of a TransactionLog.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class SpyMessageLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private ObjectOutputStream out;
private TransactionLog transactionLog;
private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
/////////////////////////////////////////////////////////////////////
// Helper Inner classes
/////////////////////////////////////////////////////////////////////
static class MyObjectOutputStream extends ObjectOutputStream {
MyObjectOutputStream( OutputStream os ) throws IOException {
super(os);
}
/**
* diable the writing of the stream header.
*/
protected void writeStreamHeader() {
}
}
static class MyObjectInputStream extends ObjectInputStream {
MyObjectInputStream( InputStream is ) throws IOException {
super(is);
}
/**
* diable the reading of the stream header.
*/
protected void readStreamHeader() {
}
}
static class MessageAddedRecord implements Serializable {
long messageId;
SpyMessage message;
}
static class MessageRemovedRecord implements Serializable {
long messageId;
}
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public SpyMessageLog(String fileName) throws JMSException {
try {
transactionLog = new TransactionLog(fileName);
out = new MyObjectOutputStream( new BufferedOutputStream(
transactionLog.getOutputStream() ));
} catch ( IOException e ) {
JMSException newE = new JMSException("Could not open the
queue's tranaction log: "+fileName);
newE.setLinkedException(e);
throw newE;
}
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
public void logAddMessage( SpyMessage message ) throws JMSException {
try{
messageAddedRecord.message = message;
messageAddedRecord.messageId = message.messageId;
out.writeObject(messageAddedRecord);
out.reset();
} catch ( IOException e ) {
JMSException newE = new JMSException("Could not write to the
tranaction log.");
newE.setLinkedException(e);
throw newE;
}
}
public void logRemoveMessage( SpyMessage message ) throws JMSException {
try{
messageRemovedRecord.messageId = message.messageId;
out.writeObject(messageRemovedRecord);
out.reset();
} catch ( IOException e ) {
JMSException newE = new JMSException("Could not write to the
queue's tranaction log.");
newE.setLinkedException(e);
throw newE;
}
}
public SpyMessage[] rebuildMessagesFromLog() throws JMSException {
HashMap messages = new HashMap();
try {
ObjectInputStream in = new MyObjectInputStream( new
BufferedInputStream( transactionLog.getInputStream() ));
try {
while (true) {
Object o = in.readObject();
if( o instanceof MessageAddedRecord ) {
MessageAddedRecord r = (MessageAddedRecord)o;
r.message.messageId = r.messageId;
messages.put( new Long(r.messageId),
r.message);
} else if( o instanceof MessageRemovedRecord ) {
MessageRemovedRecord r =
(MessageRemovedRecord)o;
messages.remove( new Long(r.messageId));
}
}
} catch( java.io.EOFException e ) {
}
in.close();
} catch ( Exception e ) {
JMSException newE = new JMSException("Could not rebuild the
queue from the queue's tranaction log.");
newE.setLinkedException(e);
throw newE;
}
SpyMessage rc[] = new SpyMessage[messages.size()];
return (SpyMessage [])messages.values().toArray(rc);
}
public void commit() throws JMSException {
try {
out.flush();
transactionLog.commit();
} catch ( IOException e ) {
JMSException newE = new JMSException("Could not commit to the
queue transaction log");
newE.setLinkedException(e);
throw newE;
}
}
public void rollback() throws JMSException {
try {
out.flush();
transactionLog.rollback();
} catch ( IOException e ) {
JMSException newE = new JMSException("Could not rollback the
queue's tranaction log.");
newE.setLinkedException(e);
throw newE;
}
}
public void close() throws JMSException {
try{
transactionLog.close();
} catch ( IOException e ) {
JMSException newE = new JMSException("Could not close the
queue's tranaction log.");
newE.setLinkedException(e);
throw newE;
}
}
}
1.1
spyderMQ/src/java/org/spydermq/persistence/SpyMessageLogTester.java
Index: SpyMessageLogTester.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.persistence;
import org.spydermq.*;
/**
* This class was used to perform unit testing on the SpyMessageLog
*
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class SpyMessageLogTester {
/**
* Starts the application.
* @param args an array of command-line arguments
*/
public static void main(java.lang.String[] args) throws Exception {
SpyMessageLog log = new SpyMessageLog("SpyMessage20.log");
try{
SpyMessage[] queue = log.rebuildMessagesFromLog();
System.out.println("Recovered :"+queue.length+" message from
the message log");
long maxMessageId=0;
for( int i=0; i < queue.length; i++ ) {
System.out.println(" #"+i+": "+queue[i]);
maxMessageId = Math.max(maxMessageId,
queue[i].messageId );
}
long first = ++maxMessageId;
add(log, first);
long second = ++maxMessageId;
add(log, second);
remove(log, first);
System.out.println("Commiting");
log.commit();
add(log, first);
System.out.println("Rolling back");
log.rollback();
add(log, second+1);
System.out.println("Commiting");
log.commit();
System.exit(0);
} finally {
log.close();
}
}
public static void add(SpyMessageLog log, long messageId) throws Exception {
SpyTextMessage m = new SpyTextMessage();
m.messageId = messageId;
m.setText("Hello World #"+m.messageId);
System.out.println("Adding message: "+m);
log.logAddMessage(m);
}
public static void remove(SpyMessageLog log, long messageId) throws Exception {
SpyTextMessage m = new SpyTextMessage();
m.messageId = messageId;
m.setText("Hello World #"+m.messageId);
System.out.println("Removing message: "+m);
log.logRemoveMessage(m);
}
}
1.1 spyderMQ/src/java/org/spydermq/persistence/TransactionLog.java
Index: TransactionLog.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.persistence;
import java.io.RandomAccessFile;
import java.io.OutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.File;
/**
* This class is used to log tansactions.
*
* The InputStream returned by getInputStream() will read
* data placed into the log with the OutputStream returned by
* getOutputStream(). The EOF for the InputStream is the
* last commited point of the OutputStream.
*
* @author: Hiram Chirino ([EMAIL PROTECTED])
* @version $Revision: 1.1 $
*/
public class TransactionLog {
/////////////////////////////////////////////////////////////////////
// Attributes
/////////////////////////////////////////////////////////////////////
private static final int HEADER_SIZE=16; // in bytes
// Header related stuff
private long firstRecordPos;
private long nextRecordPos;
private byte headerBytes[]=new byte[HEADER_SIZE];
private RandomAccessFile raf;
private LogOutputStream outputStream;
private LogInputStream inputStream;
/////////////////////////////////////////////////////////////////////
// Helper Inner Classes
/////////////////////////////////////////////////////////////////////
class LogInputStream extends InputStream {
boolean closed = false;
long inputPos = 0;
public void close() throws IOException {
super.close();
closed = true;
}
public int read() throws IOException {
inputPos = Math.max(inputPos, firstRecordPos);
int rc = TransactionLog.this.read(inputPos);
if( rc >= 0 )
inputPos ++;
return rc;
}
public int read(byte bytes[], int off, int len) throws IOException {
inputPos = Math.max(inputPos, firstRecordPos);
int rc = TransactionLog.this.read(inputPos, bytes, off, len);
if( rc >= 0 )
inputPos += rc;
return rc;
}
}
class LogOutputStream extends OutputStream {
boolean closed = false;
public void close() throws IOException {
super.close();
closed = true;
}
public void write(int b) throws IOException {
TransactionLog.this.write( (byte)b );
}
public void write(byte bytes[], int off, int len) throws IOException {
TransactionLog.this.write( bytes, off, len );
}
}
/////////////////////////////////////////////////////////////////////
// Constructor
/////////////////////////////////////////////////////////////////////
public TransactionLog(String fileName) throws IOException {
File f = new File(fileName);
boolean exists = f.isFile();
raf = new RandomAccessFile(f, "rw");
if( exists ) {
loadHeader();
} else {
initHeader();
}
}
/////////////////////////////////////////////////////////////////////
// Public Methods
/////////////////////////////////////////////////////////////////////
public OutputStream getOutputStream() throws IOException {
if ( outputStream==null || outputStream.closed ) {
outputStream = new LogOutputStream();
}
return outputStream;
}
public InputStream getInputStream() {
if ( inputStream==null || inputStream.closed ) {
inputStream = new LogInputStream();
}
return inputStream;
}
public void commit() throws IOException {
headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
headerBytes[6] = (byte)((firstRecordPos >>> 8) & 0xFF);
headerBytes[7] = (byte)((firstRecordPos >>> 0) & 0xFF);
headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
headerBytes[14] =(byte)((nextRecordPos >>> 8) & 0xFF);
headerBytes[15] =(byte)((nextRecordPos >>> 0) & 0xFF);
raf.seek(0);
raf.write(headerBytes);
}
public void rollback() throws IOException {
loadHeader();
}
public void close() throws IOException {
raf.close();
raf = null;
}
/////////////////////////////////////////////////////////////////////
// Private Methods
/////////////////////////////////////////////////////////////////////
private long getBytesLeft(long offset) {
return nextRecordPos-offset;
}
private void initHeader() throws IOException {
firstRecordPos = HEADER_SIZE;
nextRecordPos = HEADER_SIZE;
commit();
}
private void loadHeader() throws IOException {
raf.seek(0);
firstRecordPos = raf.readLong();
nextRecordPos = raf.readLong();
}
private int read(long offset) throws IOException {
if( offset >= nextRecordPos )
return -1;
if( raf.getFilePointer() != offset ) {
raf.seek(offset);
}
int rc = raf.read();
return rc;
}
private int read(long offset, byte bytes[], int off, int len) throws
IOException {
if( offset >= nextRecordPos )
return -1;
len = (int)Math.min(len, getBytesLeft(offset));
if( raf.getFilePointer() != offset ) {
raf.seek(offset);
}
int rc = raf.read(bytes, off, len);
return rc;
}
private void write(byte []record, int off, int len) throws IOException {
if( raf.getFilePointer() != nextRecordPos ) {
raf.seek(nextRecordPos);
}
raf.write(record, off, len);
nextRecordPos+=len;
}
private void write(byte b) throws IOException {
if( raf.getFilePointer() != nextRecordPos ) {
raf.seek(nextRecordPos);
}
raf.write(b);
nextRecordPos++;
}
}