User: hiram   
  Date: 00/12/15 19:27:50

  Modified:    src/java/org/spydermq/persistence SpyMessageLog.java
                        SpyMessageLogTester.java
  Added:       src/java/org/spydermq/persistence IntegrityLog.java
                        ObjectIntegrityLog.java SpyTxLog.java
  Removed:     src/java/org/spydermq/persistence TransactionLog.java
  Log:
  Better persistence and Transactions!
  Transactions work like the should, all operations get done, or none at all
  (It should even be rolling back persistent messages that were logged that were part
  of a transaction that did not complete due to abnormal server termination).
  You can also configure in what directory you would like to
  store the persistence data at.
  
  Revision  Changes    Path
  1.2       +88 -111   spyderMQ/src/java/org/spydermq/persistence/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/persistence/SpyMessageLog.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyMessageLog.java        2000/11/22 17:43:32     1.1
  +++ SpyMessageLog.java        2000/12/16 03:27:48     1.2
  @@ -6,188 +6,165 @@
    */
   package org.spydermq.persistence;
   
  -import java.io.InputStream;
  -import java.io.OutputStream;
  -import java.io.ObjectInputStream;
  -import java.io.ObjectOutputStream;
  -import java.io.BufferedInputStream;
  -import java.io.BufferedOutputStream;
   import java.io.IOException;
  -import org.spydermq.SpyMessage;
   import java.io.Serializable;
  -import java.util.HashMap;
  +
  +import org.spydermq.SpyMessage;
   
   import javax.jms.JMSException;
   
   /**
    * This is used to keep a log of SpyMessages arriving and leaving 
    * a queue.  The log can be used reconstruct the queue in case of
  - * provider failure.  Transactional integrety is kept by the use
  - * of a TransactionLog.
  + * provider failure.  Integrety is kept by the use of an ObjectIntegrityLog.
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $    
  + * @version $Revision: 1.2 $    
    */
   public class SpyMessageLog {
   
        /////////////////////////////////////////////////////////////////////
        // Attributes
  -     /////////////////////////////////////////////////////////////////////
  -     private ObjectOutputStream out;
  -     private TransactionLog transactionLog;
  +     /////////////////////////////////////////////////////////////////////   
  +     private ObjectIntegrityLog transactionLog;
        private MessageAddedRecord messageAddedRecord = new MessageAddedRecord();
        private MessageRemovedRecord messageRemovedRecord = new MessageRemovedRecord();
   
  -
  -     /////////////////////////////////////////////////////////////////////
  -     // Helper Inner classes
        /////////////////////////////////////////////////////////////////////
  -     static class MyObjectOutputStream extends ObjectOutputStream {
  -             MyObjectOutputStream( OutputStream os )  throws IOException {
  -                     super(os);
  -             }
  -             /**
  -              * diable the writing of the stream header.
  -              */
  -             protected void writeStreamHeader() {
  -             }
  -     }
  -     
  -     static class MyObjectInputStream extends ObjectInputStream      {
  -             MyObjectInputStream( InputStream is ) throws IOException {
  -                     super(is);
  -             }
  -             /**
  -              * diable the reading of the stream header.
  -              */
  -             protected void readStreamHeader() {
  -             }
  -     }
  -     
  +     // Helper Inner Classes
  +     /////////////////////////////////////////////////////////////////////   
        static class MessageAddedRecord implements Serializable {
                long messageId;
  +             boolean isTransacted;
  +             long transactionId;
                SpyMessage message;
        }
        
        static class MessageRemovedRecord implements Serializable {
  +             boolean isTransacted;
  +             long transactionId;
                long messageId;
        }
   
  -     
  -     
  +             
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public SpyMessageLog(String fileName) throws JMSException {
                try {
  -                     transactionLog = new TransactionLog(fileName);  
  -                     out = new MyObjectOutputStream( new BufferedOutputStream( 
transactionLog.getOutputStream() ));
  +                     transactionLog = new ObjectIntegrityLog(fileName);      
                } catch ( IOException e ) {
  -                     JMSException newE = new JMSException("Could not open the 
queue's tranaction log: "+fileName);
  -                     newE.setLinkedException(e);
  -                     throw newE;
  +                     throwJMSException("Could not open the queue's tranaction log: 
"+fileName,e);
                }
        }
   
        
  -
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
  -     public void logAddMessage( SpyMessage message ) throws JMSException {
  +     synchronized public void close() throws JMSException {
                try{
  +                     transactionLog.close();         
  +             } catch ( IOException e ) {
  +                     throwJMSException("Could not close the queue's tranaction 
log.",e);
  +             }
  +     }
  +
  +     synchronized public void add( SpyMessage message, Long transactionId ) throws 
JMSException {
  +             try{
  +                     
                        messageAddedRecord.message = message;
                        messageAddedRecord.messageId = message.messageId;
  -                     out.writeObject(messageAddedRecord);
  -                     out.reset();
  +                     if( transactionId == null )     {
  +                             messageAddedRecord.isTransacted = false;
  +                     } else {
  +                             messageAddedRecord.isTransacted = true;
  +                             messageAddedRecord.transactionId = 
transactionId.longValue();
  +                     }
  +                             
  +                     transactionLog.add(messageAddedRecord);
  +                     transactionLog.commit();
  +                     
                } catch ( IOException e ) {
  -                     JMSException newE = new JMSException("Could not write to the 
tranaction log.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  +                     throwJMSException("Could not write to the tranaction log.",e);
                }
                
  -     }
  +     }       
        
  -     public void logRemoveMessage( SpyMessage message ) throws JMSException {
  -             try{    
  +     synchronized public void remove( SpyMessage message, Long transactionId ) 
throws JMSException {
  +             try{
  +                     
                        messageRemovedRecord.messageId = message.messageId;
  -                     out.writeObject(messageRemovedRecord);
  -                     out.reset();
  +                     if( transactionId == null ) {
  +                             messageRemovedRecord.isTransacted = false;
  +                     } else {
  +                             messageRemovedRecord.isTransacted = true;
  +                             messageRemovedRecord.transactionId = 
transactionId.longValue();
  +                     }
  +                     transactionLog.add(messageRemovedRecord);
  +                     transactionLog.commit();
  +                     
                } catch ( IOException e ) {
  -                     JMSException newE = new JMSException("Could not write to the 
queue's tranaction log.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  +                     throwJMSException("Could not write to the queue's tranaction 
log.",e);
                }
   
  -     }
  +     }       
        
  -     public SpyMessage[] rebuildMessagesFromLog() throws JMSException {
  -             HashMap messages = new HashMap();
  +     synchronized public SpyMessage[] restore(java.util.TreeSet commited) throws 
JMSException {
   
  +             java.util.HashMap messageIndex = new java.util.HashMap();
  +                     
                try {   
  -                     ObjectInputStream in = new MyObjectInputStream( new 
BufferedInputStream( transactionLog.getInputStream() ));
  -                     try {
  -                     while (true) {
  +                     ObjectIntegrityLog.IndexItem objects[] = 
transactionLog.toIndex();
  +                     
  +                     for( int i=0; i < objects.length; i++ ) {
                                
  -                             Object o = in.readObject();
  +                             Object o = objects[i].record;
                                if( o instanceof MessageAddedRecord ) {
                                        
                                        MessageAddedRecord r = (MessageAddedRecord)o;
                                        r.message.messageId = r.messageId;
  -                                     messages.put( new Long(r.messageId), 
r.message);
  +
  +                                     if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
  +                                             // the TX this message was part of was 
not
  +                                             // commited... so drop this message
  +                                             continue;
  +                                     }
  +                                     
  +                                     messageIndex.put( new Long(r.messageId), 
objects[i]);
                                        
                                } else if( o instanceof MessageRemovedRecord ) {
                                        
                                        MessageRemovedRecord r = 
(MessageRemovedRecord)o;
  -                                     messages.remove( new Long(r.messageId));
  +
  +                                     if( r.isTransacted && !commited.contains(new 
Long(r.transactionId)) ) {
  +                                             // the TX this message was part of was 
not
  +                                             // commited... so drop this message
  +                                             continue;
  +                                     }
                                        
  +                                     messageIndex.remove( new Long(r.messageId));
  +                                     
                                }
                                
                        }
  -                     } catch( java.io.EOFException e ) {
  -                     } 
  -                     in.close();
                } catch ( Exception e ) {
  -                     JMSException newE = new JMSException("Could not rebuild the 
queue from the queue's tranaction log.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  +                     throwJMSException("Could not rebuild the queue from the 
queue's tranaction log.",e);
                }
   
  -             SpyMessage rc[] = new SpyMessage[messages.size()];
  -             return (SpyMessage [])messages.values().toArray(rc);            
  +             SpyMessage rc[] = new SpyMessage[messageIndex.size()];
  +             java.util.Iterator iter = messageIndex.values().iterator();
  +             for( int i=0; iter.hasNext(); i++ ) {
  +                     ObjectIntegrityLog.IndexItem item = 
(ObjectIntegrityLog.IndexItem)iter.next();
  +                     rc[i] = ((MessageAddedRecord)item.record).message;
  +             }
  +             return rc;              
  +     }       
  +     
  +     private void throwJMSException(String message, Exception e) throws 
JMSException {
  +             JMSException newE = new JMSException(message);
  +             newE.setLinkedException(e);
  +             throw newE;             
        }
  -
  -     public void commit() throws JMSException {
  -             try {
  -                     out.flush();
  -                     transactionLog.commit();
  -             } catch ( IOException e ) {
  -                     JMSException newE = new JMSException("Could not commit to the 
queue transaction log");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             }
  -             
  -     }
  -     
  -     public void rollback() throws JMSException {
  -             try {
  -                     out.flush();
  -                     transactionLog.rollback();
  -             } catch ( IOException e ) {
  -                     JMSException newE = new JMSException("Could not rollback the 
queue's tranaction log.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             }               
  -     }
        
  -     public void close() throws JMSException {
  -             try{
  -                     transactionLog.close();         
  -             } catch ( IOException e ) {
  -                     JMSException newE = new JMSException("Could not close the 
queue's tranaction log.");
  -                     newE.setLinkedException(e);
  -                     throw newE;
  -             }
  -     }
  -
   }
  
  
  
  1.2       +25 -23    
spyderMQ/src/java/org/spydermq/persistence/SpyMessageLogTester.java
  
  Index: SpyMessageLogTester.java
  ===================================================================
  RCS file: 
/products/cvs/ejboss/spyderMQ/src/java/org/spydermq/persistence/SpyMessageLogTester.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SpyMessageLogTester.java  2000/11/22 17:43:32     1.1
  +++ SpyMessageLogTester.java  2000/12/16 03:27:48     1.2
  @@ -9,26 +9,27 @@
   import org.spydermq.*;
   
   /**
  - * This class was used to perform unit testing on the SpyMessageLog
  + * This class was used to perform unit testing on the SpyMessageLog/SpyTxLog
    * 
    *
    * @author: Hiram Chirino ([EMAIL PROTECTED])
  - * @version $Revision: 1.1 $    
  + * @version $Revision: 1.2 $    
    */
   public class SpyMessageLogTester {
   
  -     /**
  +             /**
         * Starts the application.
         * @param args an array of command-line arguments
         */
        public static void main(java.lang.String[] args) throws Exception {
   
  -             SpyMessageLog log = new SpyMessageLog("SpyMessage20.log");
  +             SpyTxLog tm = new SpyTxLog("SpyTxManager1.dat");
  +             SpyMessageLog log = new SpyMessageLog("SpyMessageLog1.dat");
                
                try{    
   
  -                             
  -                     SpyMessage[] queue = log.rebuildMessagesFromLog();
  +                     java.util.TreeSet commited = tm.restore();                     
         
  +                     SpyMessage[] queue = log.restore(commited);
   
                        System.out.println("Recovered :"+queue.length+" message from 
the message log");
                        long maxMessageId=0;
  @@ -36,25 +37,25 @@
                                System.out.println("  #"+i+": "+queue[i]);
                                maxMessageId = Math.max(maxMessageId, 
queue[i].messageId );
                        }
  +
  +                     Long tx1 = tm.createTx();
                        
                        long first = ++maxMessageId;
  -                     add(log, first);
  +                     add(log, first,tx1);
                        long second = ++maxMessageId;
  -                     add(log, second);
  -                     remove(log, first);
  +                     add(log, second, tx1);
  +                     remove(log, first, tx1);
   
                        System.out.println("Commiting");
  -                     log.commit();
  +                     tm.commitTx(tx1);
   
  -                     add(log, first);
  +                     Long tx2 = tm.createTx();
  +                     add(log, first,tx2);
   
                        System.out.println("Rolling back");
  -                     log.rollback();
  -
  -                     add(log, second+1);
  +                     tm.rollbackTx(tx2);
   
  -                     System.out.println("Commiting");
  -                     log.commit();
  +                     add(log, second+1, null);
                        
                        System.exit(0);
                } finally {
  @@ -63,23 +64,24 @@
   
        }
   
  -     public static void add(SpyMessageLog log, long messageId) throws Exception {
   
  +     public static void add(SpyMessageLog log, long messageId, Long txid) throws 
Exception {
  +
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
                m.setText("Hello World #"+m.messageId);
  -             System.out.println("Adding message: "+m);
  -             log.logAddMessage(m);
  +             System.out.println("Adding message: "+m+",tx="+txid);
  +             log.add(m,txid);
   
  -     }
  +     }       
        
  -     public static void remove(SpyMessageLog log, long messageId) throws Exception {
  +     public static void remove(SpyMessageLog log, long messageId, Long txid) throws 
Exception {
   
                SpyTextMessage m = new SpyTextMessage();
                m.messageId = messageId;
                m.setText("Hello World #"+m.messageId);
  -             System.out.println("Removing message: "+m);
  -             log.logRemoveMessage(m);
  +             System.out.println("Removing message: "+m+", tx="+txid);
  +             log.remove(m,txid);
   
        }
   }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/persistence/IntegrityLog.java
  
  Index: IntegrityLog.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.persistence;
  
  import java.io.RandomAccessFile;
  import java.io.OutputStream;
  import java.io.InputStream;
  import java.io.IOException;
  import java.io.File;
  
  
  /**
   * This class is used to create a log file which which will will garantee
   * it's integrety up to the last commit point.
   *
   * The InputStream returned by getInputStream() will read 
   * data placed into the log with the OutputStream returned by
   * getOutputStream().  The EOF for the InputStream is the 
   * last commited point of the OutputStream.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class IntegrityLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private static final int HEADER_SIZE=16; // in bytes
  
        // Header related stuff 
        private long firstRecordPos;
        private long nextRecordPos;
        private byte headerBytes[]=new byte[HEADER_SIZE];
        
        private RandomAccessFile raf;
  
        private LogOutputStream outputStream;
        private LogInputStream inputStream;
  
        
        /////////////////////////////////////////////////////////////////////
        // Helper Inner Classes
        /////////////////////////////////////////////////////////////////////
        class LogInputStream extends InputStream {
                
                boolean closed = false;
                long inputPos = 0;
  
                public long getFilePointer() {
                        return inputPos;
                }
                public void close() throws IOException {
                        super.close();
                        closed = true;
                }
                public int read() throws IOException {
                        inputPos = Math.max(inputPos, firstRecordPos);
                        int rc = IntegrityLog.this.read(inputPos);
                        if( rc >= 0 )
                                inputPos ++;
                        return rc;
                }
                public int read(byte bytes[], int off, int len) throws IOException {
                        inputPos = Math.max(inputPos, firstRecordPos);
                        int rc = IntegrityLog.this.read(inputPos, bytes, off, len);
                        if( rc >= 0 )
                                inputPos += rc;
                        return rc;
                }
        } 
        
        class LogOutputStream extends OutputStream {
                boolean closed = false;
                public long getFilePointer() {
                        return nextRecordPos;
                }
                public void close() throws IOException {
                        super.close();
                        closed = true;
                }
                public void write(int b) throws IOException {
                        IntegrityLog.this.write( (byte)b );
                }
                public void write(byte bytes[], int off, int len) throws IOException {
                        IntegrityLog.this.write( bytes, off, len );
                }
        }
  
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public IntegrityLog(String fileName) throws IOException {
                File f = new File(fileName);
                boolean exists = f.isFile();
                
                raf = new RandomAccessFile(f, "rw");
                if( exists ) {
                        loadHeader();
                } else {
                        initHeader();
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods 
        /////////////////////////////////////////////////////////////////////   
        public LogInputStream getInputStream() {
                if ( inputStream==null || inputStream.closed ) {
                        inputStream = new LogInputStream();
                }
                return inputStream;
        }       
        
        public LogOutputStream getOutputStream() throws IOException {
                if ( outputStream==null || outputStream.closed ) {
                        outputStream = new LogOutputStream();
                }
                return outputStream;
        }
        
        public void commit() throws IOException {
  
                headerBytes[0] = (byte)((firstRecordPos >>> 56) & 0xFF);
                headerBytes[1] = (byte)((firstRecordPos >>> 48) & 0xFF);
                headerBytes[2] = (byte)((firstRecordPos >>> 40) & 0xFF);
                headerBytes[3] = (byte)((firstRecordPos >>> 32) & 0xFF);
                headerBytes[4] = (byte)((firstRecordPos >>> 24) & 0xFF);
                headerBytes[5] = (byte)((firstRecordPos >>> 16) & 0xFF);
                headerBytes[6] = (byte)((firstRecordPos >>>  8) & 0xFF);
                headerBytes[7] = (byte)((firstRecordPos >>>  0) & 0xFF);
                headerBytes[8] = (byte)((nextRecordPos >>> 56) & 0xFF);
                headerBytes[9] = (byte)((nextRecordPos >>> 48) & 0xFF);
                headerBytes[10] =(byte)((nextRecordPos >>> 40) & 0xFF);
                headerBytes[11] =(byte)((nextRecordPos >>> 32) & 0xFF);
                headerBytes[12] =(byte)((nextRecordPos >>> 24) & 0xFF);
                headerBytes[13] =(byte)((nextRecordPos >>> 16) & 0xFF);
                headerBytes[14] =(byte)((nextRecordPos >>>  8) & 0xFF);
                headerBytes[15] =(byte)((nextRecordPos >>>  0) & 0xFF);
  
                raf.seek(0);
                raf.write(headerBytes);
        }
        
        public void rollback() throws IOException {
                loadHeader();
        }
        
        public void close() throws IOException {
                raf.close();
                raf = null;
        }
  
        /////////////////////////////////////////////////////////////////////
        // Private Methods 
        /////////////////////////////////////////////////////////////////////          
 
        private long getBytesLeft(long offset) {
  
                return nextRecordPos-offset;
                
        }
        
        private void initHeader() throws IOException {
                
                firstRecordPos = HEADER_SIZE;
                nextRecordPos = HEADER_SIZE;
  
                commit();
        }
        
        private void loadHeader() throws IOException {
  
                raf.seek(0);
                firstRecordPos = raf.readLong();
                nextRecordPos = raf.readLong();
                
        }
        
        private int read(long offset) throws IOException {
  
                if( offset >= nextRecordPos )
                        return -1;
                
                if( raf.getFilePointer() != offset ) {
                        raf.seek(offset);
                }
  
                int rc = raf.read();
                return rc;
                
        }
        
        private int read(long offset, byte bytes[], int off, int len) throws 
IOException {
  
                if( offset >= nextRecordPos )
                        return -1;
  
                len = (int)Math.min(len, getBytesLeft(offset));
                
                if( raf.getFilePointer() != offset ) {
                        raf.seek(offset);
                }
  
                int rc = raf.read(bytes, off, len);
                return rc;
                
        }
        
        private void write(byte []record, int off, int len) throws IOException {
                if( raf.getFilePointer() != nextRecordPos ) {
                        raf.seek(nextRecordPos);
                }
                
                raf.write(record, off, len);
                nextRecordPos+=len;
        }
        
        private void write(byte b) throws IOException {
                
                if( raf.getFilePointer() != nextRecordPos ) {
                        raf.seek(nextRecordPos);
                }
                
                raf.write(b);
                nextRecordPos++;
        }
        
  }
  
  
  
  1.1                  
spyderMQ/src/java/org/spydermq/persistence/ObjectIntegrityLog.java
  
  Index: ObjectIntegrityLog.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.persistence;
  
  import java.io.InputStream;
  import java.io.OutputStream;
  import java.io.ObjectInputStream;
  import java.io.ObjectOutputStream;
  import java.io.IOException;
  import java.io.Serializable;
  import java.io.BufferedInputStream;
  
  import javax.jms.JMSException;
  
  
  /**
   * This is used to keep a log of Serializable Objects with garanteed integrety.
   *
   * Every object add()ed to the log without an exception is garenteed
   * to be recovered by any of the to*() methods.  The log file will not be
   * corrupted if the process dies in the middle of an add().
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class ObjectIntegrityLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private IntegrityLog.LogOutputStream logOutputStream;
        private ObjectOutputStream out;
        private IntegrityLog transactionLog;
  
        static class IndexItem {
                long recordOffset;
                Object record;
        } 
  
  
        /////////////////////////////////////////////////////////////////////
        // Helper Inner classes
        /////////////////////////////////////////////////////////////////////
        static class MyObjectOutputStream extends ObjectOutputStream {
                MyObjectOutputStream(OutputStream os) throws IOException {
                        super(os);
                }
                /**
                 * diable the writing of the stream header.
                 */
                protected void writeStreamHeader() {
                }
        }
  
        static class MyObjectInputStream extends ObjectInputStream {
                MyObjectInputStream(InputStream is) throws IOException {
                        super(is);
                }
                /**
                 * diable the reading of the stream header.
                 */
                protected void readStreamHeader() {
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Constructor
        /////////////////////////////////////////////////////////////////////
        public ObjectIntegrityLog(String fileName) throws IOException {
                transactionLog = new IntegrityLog(fileName);
                logOutputStream = transactionLog.getOutputStream();
                out = new MyObjectOutputStream(logOutputStream);
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        public void commit() throws IOException {
                transactionLog.commit();
        }
  
        public void rollback() throws IOException {
                transactionLog.rollback();
        }
  
        public void close() throws IOException {
                transactionLog.close();
        }
        
        public IndexItem add(Object o) throws IOException {
                IndexItem item = new IndexItem();
                item.record = o;
                item.recordOffset = logOutputStream.getFilePointer();
  
                out.writeObject(o);
                out.reset();
                out.flush();
  
                return item;
        }
        
        public Object[] toArray() throws IOException, ClassNotFoundException {
                java.util.LinkedList ll = new java.util.LinkedList();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                ll.addLast(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                Object rc[] = new Object[ll.size()];
                return (Object[]) ll.toArray(rc);
        }
        
        public java.util.HashSet toHashSet() throws IOException, 
ClassNotFoundException {
                java.util.HashSet hash = new java.util.HashSet();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                hash.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return hash;
        }
        
        public IndexItem[] toIndex() throws IOException, ClassNotFoundException {
                java.util.LinkedList ll = new java.util.LinkedList();
  
                IntegrityLog.LogInputStream logStream = 
transactionLog.getInputStream();
                ObjectInputStream in = new MyObjectInputStream(logStream);
  
                try {
                        while (true) {
  
                                IndexItem i = new IndexItem();
                                i.recordOffset = logStream.getFilePointer();
                                i.record = in.readObject();
                                ll.addLast(i);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                IndexItem rc[] = new IndexItem[ll.size()];
                return (IndexItem[]) ll.toArray(rc);
        }
        
        public java.util.TreeSet toTreeSet() throws IOException, 
ClassNotFoundException {
                java.util.TreeSet treeSet = new java.util.TreeSet();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                treeSet.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return treeSet;
        }
        
        public java.util.Vector toVector() throws IOException, ClassNotFoundException {
                java.util.Vector vector = new java.util.Vector();
  
                ObjectInputStream in = new MyObjectInputStream(new 
BufferedInputStream(transactionLog.getInputStream()));
                try {
                        while (true) {
  
                                Object o = in.readObject();
                                vector.add(o);
  
                        }
                } catch (java.io.EOFException e) {
                }
                in.close();
  
                return vector;
        }
  }
  
  
  
  1.1                  spyderMQ/src/java/org/spydermq/persistence/SpyTxLog.java
  
  Index: SpyTxLog.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.persistence;
   
  import java.io.Serializable;
  import java.io.IOException;
  
  import javax.jms.JMSException;
  
  /**
   * This is used to keep a log of commited transactions.
   *
   * @author: Hiram Chirino ([EMAIL PROTECTED])
   * @version $Revision: 1.1 $    
   */
  public class SpyTxLog {
  
        /////////////////////////////////////////////////////////////////////
        // Attributes
        /////////////////////////////////////////////////////////////////////
        private ObjectIntegrityLog transactionLog;
        private long nextTransactionId = Long.MIN_VALUE;
        
        /////////////////////////////////////////////////////////////////////
        // Constructors
        /////////////////////////////////////////////////////////////////////
        public SpyTxLog(String fileName) throws JMSException {
                try {
                        transactionLog = new ObjectIntegrityLog(fileName);
                } catch (IOException e) {
                        throwJMSException("Could not open the queue's tranaction log: 
" + fileName, e);
                }
        }
  
        /////////////////////////////////////////////////////////////////////
        // Public Methods
        /////////////////////////////////////////////////////////////////////
        synchronized public void close() throws JMSException {
                try{
                        transactionLog.close();         
                } catch ( IOException e ) {
                        throwJMSException("Could not close the queue's tranaction 
log.",e);
                }
        }
        
        synchronized public void commitTx(Long id) throws JMSException {
                                
                try {
                        transactionLog.add(id);
                        transactionLog.commit();
                } catch ( IOException e ) {
                        throwJMSException("Could not create a new transaction.",e);
                }
                
        }
        
        synchronized public Long createTx() throws JMSException {
                return new Long(nextTransactionId++);
        }
        
        synchronized public java.util.TreeSet restore() throws JMSException {
                
                java.util.TreeSet items=null;
                try {
                        items = transactionLog.toTreeSet();
                } catch ( Exception e ) {
                        throwJMSException("Could not restore the transaction log.",e);
                }               
  
                long maxId = Long.MIN_VALUE;
                java.util.Iterator iter = items.iterator();
                while( iter.hasNext() ) {
                        Long l = (Long)iter.next();
                        if( l.longValue() > maxId )
                                maxId = l.longValue();
                }
  
                nextTransactionId = maxId+1;
                return items;
                        
        }
  
        synchronized public void rollbackTx(Long txId) throws JMSException {
                        
        }
        
        /////////////////////////////////////////////////////////////////////
        // Private Methods
        /////////////////////////////////////////////////////////////////////
        private void throwJMSException(String message, Exception e) throws 
JMSException {
                JMSException newE = new JMSException(message);
                newE.setLinkedException(e);
                throw newE;             
        }
        
  }
  
  
  

Reply via email to