User: d_jencks
  Date: 01/11/10 13:38:05

  Modified:    src/main/org/jboss/mq/pm/file MessageLog.java
                        PersistenceManager.java
                        PersistenceManagerMBean.java
  Log:
  Changed mbean dependencies to work directly by mbean-references: eliminated depends 
tag from *service.xml files
  
  Revision  Changes    Path
  1.6       +27 -13    jbossmq/src/main/org/jboss/mq/pm/file/MessageLog.java
  
  Index: MessageLog.java
  ===================================================================
  RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/MessageLog.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- MessageLog.java   2001/10/28 04:07:34     1.5
  +++ MessageLog.java   2001/11/10 21:38:04     1.6
  @@ -5,20 +5,23 @@
    * See terms of license at gnu.org.
    */
   package org.jboss.mq.pm.file;
  +
  +
  +
   import java.io.File;
   import java.io.FileInputStream;
   import java.io.FileOutputStream;
  -
   import java.io.IOException;
   import java.io.ObjectInputStream;
   import java.io.ObjectOutputStream;
   import java.io.Serializable;
  -
  +import java.util.Map;
  +import java.util.TreeMap;
   import javax.jms.JMSException;
  -import org.jboss.mq.server.MessageReference ;
  -import org.jboss.mq.server.JMSServer;
  -
   import org.jboss.mq.*;
  +import org.jboss.mq.server.JMSServer;
  +import org.jboss.mq.server.MessageCache;
  +import org.jboss.mq.server.MessageReference ;
   
   /**
    *  This is used to keep SpyMessages on the disk and is used reconstruct the
  @@ -26,7 +29,7 @@
    *
    * @created    August 16, 2001
    * @author:    Paul Kendall ([EMAIL PROTECTED])
  - * @version    $Revision: 1.5 $
  + * @version    $Revision: 1.6 $
    */
   public class MessageLog {
   
  @@ -35,6 +38,8 @@
      /////////////////////////////////////////////////////////////////////
      private File     queueName;
   
  +   private MessageCache messageCache;
  +
      /////////////////////////////////////////////////////////////////////
      // Constants
      /////////////////////////////////////////////////////////////////////
  @@ -49,8 +54,15 @@
      /////////////////////////////////////////////////////////////////////
      // Constructor
      /////////////////////////////////////////////////////////////////////
  -   public MessageLog( File file )
  -      throws JMSException {
  +   public MessageLog(MessageCache messageCache, File file )
  +
  +   {
  +      if (messageCache == null) 
  +      {
  +         throw new IllegalArgumentException("Need a MessageCache to construct a 
MessageLog!");
  +      } // end of if ()
  +      
  +      this.messageCache = messageCache;
         queueName = file;
         queueName.mkdirs();
      }
  @@ -63,10 +75,10 @@
      }
   
   
  -   public MessageReference[] restore( java.util.TreeSet rollBackTXs )
  +   public Map restore( java.util.TreeSet rollBackTXs )
         throws JMSException {
         //use sorted map to get queue order right.
  -      java.util.TreeMap messageIndex = new java.util.TreeMap();
  +      TreeMap messageIndex = new TreeMap();
   
         try {
            File[] files = queueName.listFiles();
  @@ -89,13 +101,15 @@
         } catch ( Exception e ) {
            throwJMSException( "Could not rebuild the queue from the queue's 
tranaction log.", e );
         }
  -
  +      return messageIndex;
  +      /*
         MessageReference rc[] = new MessageReference[messageIndex.size()];
         java.util.Iterator iter = messageIndex.values().iterator();
         for ( int i = 0; iter.hasNext(); i++ ) {
            rc[i] = ( MessageReference )iter.next();
         }
         return rc;
  +      */
      }
   
      public void add( MessageReference messageRef, org.jboss.mq.pm.Tx transactionId )
  @@ -206,7 +220,7 @@
         out.close();
      }
   
  -   protected void restoreMessageFromFile( java.util.TreeMap store, File file )
  +   protected void restoreMessageFromFile(TreeMap store, File file )
         throws Exception {
         ObjectInputStream in = new ObjectInputStream( new FileInputStream( file ) );
         long msgId = in.readLong();
  @@ -238,7 +252,7 @@
         in.close();
         message.header.messageId = msgId;
         
  -      MessageReference mr = JMSServer.getInstance().getMessageCache().add(message);
  +      MessageReference mr = messageCache.add(message);
         mr.persistData = file;
         store.put( new Long( msgId ), mr );
      }
  
  
  
  1.9       +243 -23   jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManager.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- PersistenceManager.java   2001/10/28 04:07:34     1.8
  +++ PersistenceManager.java   2001/11/10 21:38:04     1.9
  @@ -5,44 +5,55 @@
    * See terms of license at gnu.org.
    */
   package org.jboss.mq.pm.file;
  +
  +
  +
  +
  +
  +
  +
  +
  +
   import java.io.File;
   import java.io.IOException;
  -
   import java.net.URL;
  +import java.util.Collection;
   import java.util.HashMap;
   import java.util.Iterator;
   import java.util.LinkedList;
  +import java.util.Map;
   import java.util.TreeSet;
  -
   import javax.jms.JMSException;
  -
  -
  -
  -
  -
   import javax.management.*;
  -
   import org.jboss.mq.SpyDestination;
   import org.jboss.mq.SpyJMSException;
   import org.jboss.mq.SpyMessage;
  +import org.jboss.mq.SpyMessage;
   import org.jboss.mq.pm.TxManager;
   import org.jboss.mq.server.JMSDestination;
  +import org.jboss.mq.server.JMSQueue;
   import org.jboss.mq.server.JMSServer;
  -import org.jboss.system.ServiceMBeanSupport;
  -
  -import org.jboss.mq.SpyMessage;
  +import org.jboss.mq.server.JMSTopic;
  +import org.jboss.mq.server.PersistentQueue;
   import org.jboss.mq.server.MessageReference;
  +import org.jboss.mq.server.MessageCache;
  +import org.jboss.system.ServiceMBeanSupport;
   /**
    *  This class manages all persistence related services for file based
    *  persistence.
    *
    * @author     Paul Kendall ([EMAIL PROTECTED])
  - * @version    $Revision: 1.8 $
  + * @version    $Revision: 1.9 $
    */
   public class PersistenceManager extends ServiceMBeanSupport implements 
PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager
   {
   
      protected final static int MAX_POOL_SIZE = 50;
  +
  +
  +   private ObjectName messageCacheName;
  +   private MessageCache messageCache;
  +
      protected java.util.ArrayList txPool = new java.util.ArrayList();
   
      protected long tidcounter = Long.MIN_VALUE;
  @@ -56,6 +67,8 @@
      HashMap messageLogs = new HashMap();
      // Maps (Long)txIds to LinkedList of AddFile tasks
      HashMap transactedTasks = new HashMap();
  +   //Holds unrestored messages read from queues, indexed by queue name.
  +   Map unrestoredMessages = new HashMap();
   
      /**
       *  PersistenceManager constructor.
  @@ -67,6 +80,26 @@
         txManager = new TxManager(this);
      }
   
  +   public org.jboss.mq.pm.PersistenceManager getInstance()
  +   {
  +      return this;
  +   }
  +
  +   public ObjectName getMessageCache()
  +   {
  +      return messageCacheName;
  +   }
  +
  +   public void setMessageCache(ObjectName messageCache)
  +   {
  +      this.messageCacheName = messageCache;
  +   }
  +
  +   public MessageCache getMessageCacheInstance()
  +   {
  +      return messageCache;
  +   }
  +
      /**
       *  Sets the DataDirectory attribute of the PersistenceManager object
       *
  @@ -107,32 +140,197 @@
         return txManager;
      }
   
  -
      /**
       *  #Description of the Method
       *
       * @exception  Exception  Description of Exception
       */
  -   public void initService() throws Exception
  +   public void startService() throws Exception
      {
         File jbossHome = new File(System.getProperty("jboss.system.home"));
         dataDirFile = new File(jbossHome, dataDirectory);
         dataDirFile.mkdirs();
         if( !dataDirFile.isDirectory() )
            throw new Exception("The data directory is not valid: 
"+dataDirFile.getCanonicalPath());
  -      JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[]{}, new String[]{});
  -      server.setPersistenceManager(this);
  +      messageCache = (MessageCache)getServer().invoke(messageCacheName, 
"getInstance", new Object[] {}, new String[] {});
  +
  +      restoreTransactions();
  +
      }
   
      /**
  -    *  #Description of the Method
  +    * The <code>restoreTransactions</code> method is called when the 
  +    * PersistenceManager service is started.  It reads all transaction log
  +    * files, and pre-restores all messages that are committed and not read.
  +    * When a queue or topic is started, it will collect these pre-restored
  +    * messages and add them to its in memory queue.
       *
  -    * @exception  Exception  Description of Exception
  +    * @exception javax.jms.JMSException if an error occurs
       */
  -   public void startService() throws Exception
  +   private void restoreTransactions()  throws javax.jms.JMSException
  +   {
  +      //reconstruct TXs
  +      TreeSet txs = new TreeSet();
  +      File[] transactFiles = dataDirFile.listFiles();
  +      if(transactFiles != null)
  +      {
  +         for (int i = 0; i < transactFiles.length; i++)
  +         {
  +            // Set up a messageLog for each queue data directory.
  +            if( transactFiles[i].isDirectory() ) 
  +            {
  +               String dirName = transactFiles[i].toString();
  +               int start = transactFiles[i].getParent().length();
  +               String key = dirName.substring(start);
  +               MessageLog log = new MessageLog(messageCache, transactFiles[i]);
  +               LogInfo info = new LogInfo(log, null);
  +               synchronized (messageLogs)
  +               {
  +                  messageLogs.put(key, info);
  +               }
  +               transactFiles[i] = null;
  +               continue;
  +            }
  +             
  +            try
  +            {
  +               Long tx = new Long(Long.parseLong(transactFiles[i].getName()));
  +               java.util.ArrayList removingMessages = readTxFile(transactFiles[i]);
  +               if (testRollBackTx(tx, removingMessages))
  +               {
  +                  txs.add(tx);
  +               }
  +            }
  +            catch (NumberFormatException e)
  +            {
  +               log.warn("Ignoring invalid transaction record file " + 
transactFiles[i].getAbsolutePath());
  +               transactFiles[i] = null;
  +            }
  +            catch (IOException e)
  +            {
  +               JMSException jmse = new SpyJMSException("IO Error when restoring.");
  +               jmse.setLinkedException(e);
  +               throw jmse;
  +            }
  +         }
  +      }
  +      if (!txs.isEmpty())
  +      {
  +         this.tidcounter = ((Long)txs.last()).longValue() + 1;
  +      }
  +
  +      HashMap clone;
  +      synchronized (messageLogs)
  +      {
  +         clone = (HashMap)messageLogs.clone();
  +      }
  +
  +      for (Iterator i = clone.keySet().iterator(); i.hasNext();) 
  +      {
  +         Object key = i.next();
  +         LogInfo logInfo = (LogInfo)clone.get(key);
  +         unrestoredMessages.put(key, logInfo.log.restore(txs));
  +      }
  +
  +      //all txs now committed or rolled back so delete tx files
  +      if(transactFiles != null)
  +      {
  +         for (int i = 0; i < transactFiles.length; i++)
  +         {
  +            if (transactFiles[i] != null)
  +            {
  +               deleteTxFile(transactFiles[i]);
  +            }
  +         }
  +      }
  +
  +   }
  +
  +   /**
  +    * The <code>restoreDestination</code> method is called by a queue or 
  +    * topic on startup.  The method sends all the pre-restored messages to
  +    * the JMSDestination to get them back into the in-memory queue.
  +    *
  +    * @param jmsDest a <code>JMSDestination</code> value
  +    * @exception javax.jms.JMSException if an error occurs
  +    */
  +   public void restoreDestination(JMSDestination jmsDest) throws 
javax.jms.JMSException
  +   {
  +      if (jmsDest instanceof JMSQueue) 
  +      {
  +         SpyDestination spyDest = jmsDest.getSpyDestination();
  +         restoreQueue(jmsDest, spyDest);
  +      } // end of if ()
  +      else if (jmsDest instanceof JMSTopic) 
  +      {
  +         Collection persistQList = ((JMSTopic)jmsDest).getPersistentQueues();
  +         Iterator pq = persistQList.iterator();
  +         while (pq.hasNext()) 
  +         {
  +            SpyDestination spyDest = 
((PersistentQueue)pq.next()).getSpyDestination();
  +
  +            restoreQueue(jmsDest, spyDest);
  +
  +         } // end of while ()
  +         
  +      } // end of if ()
  +      
  +   }
  +
  +   /**
  +    * The <code>restoreQueue</code> method restores the messages for
  +    * a SpyDestination to its queue by sending them to the associated
  +    * JMSDestination.
  +    *
  +    * @param jmsDest a <code>JMSDestination</code> value
  +    * @param dest a <code>SpyDestination</code> value
  +    * @exception JMSException if an error occurs
  +    */
  +   public void restoreQueue(JMSDestination jmsDest, SpyDestination dest)
  +      throws JMSException
      {
  -      JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[]{}, new String[]{});
  -      restore(server);
  +
  +      //remember this queue
  +      String queueName = dest.toString();
  +      LogInfo info = (LogInfo)messageLogs.get(queueName);
  +      if (info == null) 
  +      {
  +         //must be new, set up directory etc.
  +         File logDir = new File(dataDirFile, queueName);
  +         MessageLog log = new MessageLog(messageCache, logDir);
  +         info = new LogInfo(log, dest);
  +         synchronized (messageLogs)
  +         {
  +            messageLogs.put(queueName, info);
  +         }
  +         
  +      } // end of if ()
  +      else 
  +      {
  +         info.destination = dest;
  +      } // end of else
  +      
  +      //restore the messages from old logs (previously read into unrestoredMessages)
  +      Map messages = (Map)unrestoredMessages.remove(queueName);
  +      if (messages != null) 
  +      {      
  +         synchronized (jmsDest)
  +         {
  +            Iterator m = messages.values().iterator();
  +            while (m.hasNext()) 
  +            {
  +               MessageReference message = (MessageReference)m.next();
  +               if (dest instanceof org.jboss.mq.SpyTopic)
  +               {
  +                       SpyMessage sm = message.getMessage();
  +                  sm.header.durableSubscriberID = 
((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID();
  +                  message.invalidate(); // since we did an update.
  +               }
  +               jmsDest.restoreMessage(message);
  +            } // end of while ()
  +
  +         }
  +      } // end of if ()
      }
   
      /**
  @@ -141,6 +339,7 @@
       * @param  server                      Description of Parameter
       * @exception  javax.jms.JMSException  Description of Exception
       */
  +   /*
      public void restore(JMSServer server) throws javax.jms.JMSException
      {
         //reconstruct TXs
  @@ -223,19 +422,40 @@
            }
         }
      }
  +   */
   
  +   public void initQueue(SpyDestination dest) throws javax.jms.JMSException
  +   {
  +      try
  +      {
  +         File logDir = new File(dataDirFile, dest.toString());
  +         MessageLog log = new MessageLog(messageCache, logDir);
  +         LogInfo info = new LogInfo(log, dest);
  +         synchronized (messageLogs)
  +         {
  +            messageLogs.put(dest.toString(), info);
  +         }
  +      }
  +      catch (Exception e)
  +      {
  +         javax.jms.JMSException newE = new javax.jms.JMSException("Invalid 
configuration.");
  +         newE.setLinkedException(e);
  +         throw newE;
  +      }
  +   }
      /**
       *  #Description of the Method
       *
       * @param  dest                        Description of Parameter
       * @exception  javax.jms.JMSException  Description of Exception
       */
  +   /*
      public void initQueue(SpyDestination dest) throws javax.jms.JMSException
      {
         try
         {
            File logDir = new File(dataDirFile, dest.toString());
  -         MessageLog log = new MessageLog(logDir);
  +         MessageLog log = new MessageLog(messageCache, logDir);
            LogInfo info = new LogInfo(log, dest);
            synchronized (messageLogs)
            {
  @@ -253,7 +473,7 @@
            throw newE;
         }
      }
  -
  +   */
      /**
       *  #Description of the Method
       *
  
  
  
  1.5       +3 -2      
jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManagerMBean.java
  
  Index: PersistenceManagerMBean.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManagerMBean.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- PersistenceManagerMBean.java      2001/09/01 03:00:59     1.4
  +++ PersistenceManagerMBean.java      2001/11/10 21:38:05     1.5
  @@ -7,16 +7,17 @@
   package org.jboss.mq.pm.file;
   
   import org.jboss.system.ServiceMBean;
  +import javax.management.ObjectName;
   
   /**
    *  <description>MBean interface for the JBossMQ JMX service.
    *
    * @author     Vincent Sheffer ([EMAIL PROTECTED])
    * @see        <related>
  - * @version    $Revision: 1.4 $
  + * @version    $Revision: 1.5 $
    */
   public interface PersistenceManagerMBean
  -       extends ServiceMBean
  +   extends ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean
   {
      /**
       *  Gets the DataDirectory attribute of the PersistenceManagerMBean object
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to