User: d_jencks
  Date: 01/11/10 13:38:05

  Modified:    src/main/org/jboss/mq/pm/rollinglogged
                        PersistenceManager.java
                        PersistenceManagerMBean.java SpyMessageLog.java
                        SpyTxLog.java
  Log:
  Changed mbean dependencies to work directly by mbean-references: eliminated depends 
tag from *service.xml files
  
  Revision  Changes    Path
  1.13      +293 -60   
jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java
  
  Index: PersistenceManager.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java,v
  retrieving revision 1.12
  retrieving revision 1.13
  diff -u -r1.12 -r1.13
  --- PersistenceManager.java   2001/10/29 07:32:26     1.12
  +++ PersistenceManager.java   2001/11/10 21:38:05     1.13
  @@ -6,8 +6,11 @@
    */
   package org.jboss.mq.pm.rollinglogged;
   
  +
  +
   import java.io.File;
   import java.net.URL;
  +import java.util.ArrayList;
   import java.util.HashMap;
   import java.util.HashSet;
   import java.util.Iterator;
  @@ -15,25 +18,28 @@
   import java.util.TreeSet;
   import javax.jms.JMSException;
   import javax.management.ObjectName;
  -
   import javax.naming.InitialContext;
   import org.jboss.mq.ConnectionToken;
  -
   import org.jboss.mq.SpyDestination;
   import org.jboss.mq.SpyJMSException;
   import org.jboss.mq.SpyMessage;
   import org.jboss.mq.pm.TxManager;
   import org.jboss.mq.server.JMSDestination;
  +import org.jboss.mq.server.JMSQueue;
   import org.jboss.mq.server.JMSServer;
  +import org.jboss.mq.server.JMSTopic;
  +import org.jboss.mq.server.PersistentQueue;
   import org.jboss.mq.xml.XElement;
   import org.jboss.system.ServiceMBeanSupport;
   import org.jboss.mq.server.MessageReference;
  +import org.jboss.mq.server.MessageCache;
   
   /**
    *  This class manages all persistence related services.
    *
    * @author     David Maplesden ([EMAIL PROTECTED])
  - * @version    $Revision: 1.12 $
  + * @author <a href="mailto:[EMAIL PROTECTED]";>David Jencks</a>
  + * @version    $Revision: 1.13 $
    */
   public class PersistenceManager extends ServiceMBeanSupport implements 
org.jboss.mq.pm.PersistenceManager, PersistenceManagerMBean
   {
  @@ -44,13 +50,16 @@
   
      protected static int MAX_POOL_SIZE = 50;
   
  +   private ObjectName messageCacheName;
  +   private MessageCache messageCache;
  +
      protected java.util.ArrayList listPool = new java.util.ArrayList();
      protected java.util.ArrayList txPool = new java.util.ArrayList();
   
      protected int messageCounter = 0;
      int numRollOvers = 0;
      HashMap queues = new HashMap();
  -   // Log file used to store commited transactions.
  +   // Log file used to store committed transactions.
      SpyTxLog currentTxLog;
      long nextTxId = Long.MIN_VALUE;
      // Maps txLogs to Maps of SpyDestinations to SpyMessageLogs
  @@ -65,6 +74,10 @@
   
      private String dataDirectory;
   
  +
  +   private HashMap unrestoredMessages = new HashMap();
  +
  +
      /**
       *  NewPersistenceManager constructor.
       *
  @@ -75,6 +88,21 @@
         txManager = new TxManager(this);
      }
   
  +   public ObjectName getMessageCache()
  +   {
  +      return messageCacheName;
  +   }
  +
  +   public void setMessageCache(ObjectName messageCache)
  +   {
  +      this.messageCacheName = messageCache;
  +   }
  +
  +   public MessageCache getMessageCacheInstance()
  +   {
  +      return messageCache;
  +   }
  +
   
      /**
       *  Insert the method's description here. Creation date: (6/27/2001 12:53:12
  @@ -98,6 +126,13 @@
         return dataDirectory;
      }
   
  +
  +   public org.jboss.mq.pm.PersistenceManager getInstance()
  +   {
  +      return this;
  +   }
  +
  +
      /**
       *  Gets the Name attribute of the PersistenceManager object
       *
  @@ -118,17 +153,28 @@
         return txManager;
      }
   
  +   /*public void initQueue(SpyDestination dest) throws javax.jms.JMSException
  +   {
  +      log.error("You called the wrong initQueue method");
  +   }
  +   */
      /**
       *  #Description of the Method
       *
       * @param  dest                        Description of Parameter
       * @exception  javax.jms.JMSException  Description of Exception
       */
  -   public void initQueue(SpyDestination dest) throws javax.jms.JMSException
  -   {
  -
  -      String key = "" + dest;
  -      queues.put(key, dest);
  +   // public void initQueue(JMSDestination jmsDest) throws javax.jms.JMSException
  +   //{
  +   //  restoreDestination(jmsDest);
  +      /*
  +      SpyDestination dest = jmsDest.getSpyDestination();
  +
  +      String destName = dest.toString();
  +      //really? what about multiple subscribers on a topic?
  +      queues.put(destName, dest);
  +      //new
  +      restoreDestination(jmsDest);
         SpyTxLog txLog = null;
   
         // if called before we have been started there is no need to setup log files
  @@ -146,15 +192,15 @@
            }
            synchronized (logs)
            {
  -            LogInfo logInfo = (LogInfo)logs.get(dest.toString());
  +            LogInfo logInfo = (LogInfo)logs.get(destName);
   
               if (logInfo == null)
               {
                  try
                  {
  -                  SpyMessageLog log = new SpyMessageLog(new File(dataDirFile, 
dest.toString() + ".dat" + numRollOvers));
  +                  SpyMessageLog log = new SpyMessageLog(new File(dataDirFile, 
destName + ".dat" + numRollOvers));
                     logInfo = new LogInfo(log, dest, currentTxLog);
  -                  logs.put("" + dest, logInfo);
  +                  logs.put(destName, logInfo);
                  }
                  catch (Exception e)
                  {
  @@ -165,9 +211,9 @@
               }
            }
         }
  +*/
  +   //   }
   
  -   }
  -
      /**
       *  #Description of the Method
       *
  @@ -182,7 +228,7 @@
            String key = "" + dest;
            queues.remove(key);
   
  -         SpyMessageLog log = null;
  +         LogInfo logInfo = null;
            HashMap logs;
            synchronized (messageLogs)
            {
  @@ -190,12 +236,13 @@
            }
            synchronized (logs)
            {
  -            log = (SpyMessageLog)logs.remove(key);
  +            logInfo = (LogInfo)logs.remove(key);
            }
  -         if (log == null)
  +         if (logInfo == null)
            {
               throw new SpyJMSException("The persistence log was never initialized");
            }
  +         SpyMessageLog log = logInfo.log;
            log.close();
            log.delete();
   
  @@ -207,12 +254,12 @@
                  logs = (HashMap)it.next();
                  synchronized (logs)
                  {
  -                  log = (SpyMessageLog)logs.remove(key);
  +                  logInfo = (LogInfo)logs.remove(key);
                  }
   
  -               if (log != null)
  +               if (logInfo != null)
                  {
  -                  deleteLogs.add(log);
  +                  deleteLogs.add(logInfo.log);
                  }
               }
            }
  @@ -230,6 +277,7 @@
         }
         catch (Exception e)
         {
  +         log.error("problem removing queue: " + dest, e);
            javax.jms.JMSException newE = new javax.jms.JMSException("Invalid 
configuration.");
            newE.setLinkedException(e);
            throw newE;
  @@ -238,18 +286,15 @@
      }
   
   
  +
      /**
  -    *  #Description of the Method
  +    * Describe <code>startService</code> method here.
       *
  -    * @exception  Exception  Description of Exception
  +    * @exception Exception if an error occurs
       */
  -   public void initService() throws Exception
  +   public void startService() throws Exception
      {
  -
  -      if (DEBUG)
  -      {
  -         System.out.println("Using new rolling logged persistence manager.");
  -      }
  +      log.debug("Using new rolling logged persistence manager.");
   
         File jbossHome = new File(System.getProperty("jboss.system.home"));
         dataDirFile = new File(jbossHome, dataDirectory);
  @@ -257,27 +302,16 @@
         if( !dataDirFile.isDirectory() )
            throw new Exception("The data directory is not valid: 
"+dataDirFile.getCanonicalPath());
   
  -      //Get an InitialContext
  -      JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[]{
  -            }, new String[]{
  -            });
  -      server.setPersistenceManager(this);
  -
  -   }
  +      messageCache = (MessageCache)getServer().invoke(messageCacheName, 
"getInstance", new Object[] {}, new String[] {});
   
  -   /**
  -    *  #Description of the Method
  -    *
  -    * @exception  Exception  Description of Exception
  -    */
  -   public void startService() throws Exception
  -   {
  -
  +      //read the transaction logs so we can read the queues as they come online.
  +      restoreTransactions();
  +      /*
         JMSServer server = (JMSServer)getServer().invoke(new 
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new 
Object[]{
               }, new String[]{
               });
         restore(server);
  -
  +*/
      }
   
      /**
  @@ -361,7 +395,7 @@
            releaseTx(txId);
            releaseTxInfo(info);
         }
  -      checkCleanup(info.log);
  +      checkCleanup();//info.log);
      }
   
      /**
  @@ -401,7 +435,18 @@
         synchronized (messageLogs)
         {
            HashMap logs = (HashMap)messageLogs.get(txLog);
  -         logInfo = (LogInfo)logs.get(message.getJMSDestination().toString());
  +         if (logs == null) 
  +         {
  +            log.error("keys for messageLogs are:");
  +            for (Iterator i = messageLogs.keySet().iterator(); i.hasNext();) 
  +            {
  +               log.error(i.next().toString());
  +            } // end of for ()
  +            
  +            throw new JMSException("no logs for this txLog: " + txLog);
  +         } // end of if ()
  +         String destName = message.getJMSDestination().toString();
  +         logInfo = (LogInfo)logs.get(destName);
         }
   
         if (logInfo == null)
  @@ -427,10 +472,165 @@
            {
               --logInfo.liveMessages;
            }
  -         checkCleanup(txLog);
  +         //checkCleanup(txLog); maybe only do this on rollover
         }
      }
   
  +   public void restoreTransactions() throws javax.jms.JMSException
  +   {
  +      TreeSet committedTxs = new TreeSet();
  +      HashMap txLogs = new HashMap();
  +      java.io.File dir = dataDirFile;
  +      java.io.File[] dataFiles = dir.listFiles();
  +
  +      for (int i = 0; i < dataFiles.length; ++i)
  +      {
  +         String name = dataFiles[i].getName();
  +         if (name.startsWith(TRANS_FILE_NAME))
  +         {
  +            int index = name.indexOf(".dat");
  +            if (index < 0)
  +            {
  +               continue;
  +            }
  +            String sRollOver = name.substring(index + 4);
  +            int rollOver = Integer.parseInt(sRollOver);
  +            numRollOvers = Math.max(numRollOvers, rollOver);
  +            SpyTxLog txLog = new SpyTxLog(dataFiles[i]);
  +            txLog.restore(committedTxs);
  +            txLogs.put(new Integer(rollOver), txLog);
  +            messageLogs.put(txLog, new HashMap());
  +         }
  +      }
  +
  +      if (!committedTxs.isEmpty())
  +      {
  +         nextTxId = ((org.jboss.mq.pm.Tx)committedTxs.last()).longValue();
  +      }
  +      //now "pre-restore" message logs
  +      for (int i = 0; i < dataFiles.length; ++i)
  +      {
  +         //message log names look like <queuename>.dat<rollovercounter>
  +         //4 = length(".dat");
  +         String name = dataFiles[i].getName();
  +         int index = name.indexOf(".dat");
  +         if (index < 0)
  +         {
  +            continue;
  +         }
  +         String sRollOver = name.substring(index + 4);
  +         int rollOver = Integer.parseInt(sRollOver);
  +         //key is retrieved queue name.
  +         String key = name.substring(0, name.length() - (sRollOver.length() + 4));
  +         if (!name.startsWith(TRANS_FILE_NAME))
  +         {
  +            HashMap messages = (HashMap)unrestoredMessages.get(key);
  +            if (messages == null) 
  +            {
  +               messages = new HashMap();
  +               unrestoredMessages.put(key, messages);
  +            } // end of if ()
  +            
  +            SpyMessageLog messageLog = new SpyMessageLog(messageCache, 
dataFiles[i]);
  +            SpyTxLog txLog = (SpyTxLog)txLogs.get(new Integer(rollOver));
  +            if (txLog == null) 
  +            {
  +               log.warn("no transaction log for message log " + dataFiles[i]);
  +               continue;
  +            } // end of if ()            
  +            LogInfo info = new LogInfo(messageLog, null, txLog);
  +            messageLog.restore(committedTxs, info, messages);
  +            HashMap logs = (HashMap)messageLogs.get(txLog);
  +            logs.put(key, info);
  +            unrestoredMessages.put(key, messages);
  +         }
  +      }
  +      //set up rolled over logs for new transactions.
  +      rollOverLogs();
  +   }
  +
  +
  +   public void restoreDestination(JMSDestination jmsDest) throws 
javax.jms.JMSException
  +   {
  +      if (jmsDest instanceof JMSQueue) 
  +      {
  +         SpyDestination spyDest = jmsDest.getSpyDestination();
  +         restoreQueue(jmsDest, spyDest);
  +      } // end of if ()
  +      else if (jmsDest instanceof JMSTopic) 
  +      {
  +         ArrayList persistQList = ((JMSTopic)jmsDest).getPersistentQueues();
  +         Iterator pq = persistQList.iterator();
  +         while (pq.hasNext()) 
  +         {
  +            SpyDestination spyDest = 
((PersistentQueue)pq.next()).getSpyDestination();
  +
  +            restoreQueue(jmsDest, spyDest);
  +
  +         } // end of while ()
  +         
  +      } // end of if ()
  +      
  +      //now see if we have restored all the preexisting queues
  +
  +      if (unrestoredMessages.isEmpty()) 
  +      {
  +         checkCleanup(); 
  +      } // end of if () 
  +   }
  +
  +   public void restoreQueue(JMSDestination jmsDest, SpyDestination dest)
  +      throws JMSException
  +   {
  +
  +      //remember this queue
  +      String queueName = dest.toString();
  +      queues.put(queueName, dest);
  +      //set the info.destination on all the logInfos for this queue
  +      Iterator txLogIt = messageLogs.keySet().iterator();
  +      while (txLogIt.hasNext()) 
  +      {
  +         SpyTxLog txLog = (SpyTxLog)txLogIt.next();
  +         HashMap logs = (HashMap)messageLogs.get(txLog);
  +         LogInfo info = (LogInfo)logs.get(queueName);
  +         if (info != null) 
  +         {
  +            info.destination = dest;
  +         } // end of if ()
  +
  +      } // end of while ()
  +      //restore the messages from old logs (previously read into unrestoredMessages)
  +      HashMap messages = (HashMap)unrestoredMessages.remove(queueName);
  +      if (messages != null) 
  +      {
  +         
  +      
  +         synchronized (jmsDest)
  +         {
  +            Iterator m = messages.values().iterator();
  +            while (m.hasNext()) 
  +            {
  +               //SpyMessage message = (SpyMessage)m.next();
  +               MessageReference message = (MessageReference)m.next();
  +               if (dest instanceof org.jboss.mq.SpyTopic)
  +               {
  +                       SpyMessage sm = message.getMessage();
  +                  sm.header.durableSubscriberID = 
((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID();
  +                  message.invalidate(); // since we did an update.
  +                  //message.durableSubscriberID = 
((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID();
  +               }
  +               jmsDest.restoreMessage(message);
  +            } // end of while ()
  +
  +         }
  +      } // end of if ()
  +      //set up new log file for restored (or new) queue
  +      synchronized (messageLogs)
  +      {
  +         HashMap logs = (HashMap)messageLogs.get(currentTxLog);
  +         logs.put(queueName, newQueueInfo(dest, currentTxLog));
  +      }
  +   }
      /**
       *  #Description of the Method
       *
  @@ -439,8 +639,9 @@
       */
      public void restore(org.jboss.mq.server.JMSServer server) throws 
javax.jms.JMSException
      {
  +   }/*
   
  -      TreeSet commitedTxs = new TreeSet();
  +      TreeSet committedTxs = new TreeSet();
         HashMap txLogs = new HashMap();
         java.io.File dir = dataDirFile;
         java.io.File[] dataFiles = dir.listFiles();
  @@ -459,15 +660,15 @@
               int rollOver = Integer.parseInt(sRollOver);
               numRollOvers = Math.max(numRollOvers, rollOver + 1);
               SpyTxLog txLog = new SpyTxLog(dataFiles[i]);
  -            txLog.restore(commitedTxs);
  +            txLog.restore(committedTxs);
               txLogs.put(new Integer(rollOver), txLog);
               messageLogs.put(txLog, new HashMap());
            }
         }
   
  -      if (!commitedTxs.isEmpty())
  +      if (!committedTxs.isEmpty())
         {
  -         nextTxId = ((org.jboss.mq.pm.Tx)commitedTxs.last()).longValue();
  +         nextTxId = ((org.jboss.mq.pm.Tx)committedTxs.last()).longValue();
         }
   
         for (int i = 0; i < dataFiles.length; ++i)
  @@ -488,7 +689,12 @@
                  continue;
               String key = name.substring(0, name.length() - (sRollOver.length() + 
4));
               SpyMessageLog messageLog = new SpyMessageLog(dataFiles[i]);
  +<<<<<<< PersistenceManager.java
  +            SpyMessage[] messages = messageLog.restore(committedTxs);
  +            SpyTxLog txLog = (SpyTxLog)txLogs.get(new Integer(rollOver));
  +=======
               MessageReference[] messages = messageLog.restore(commitedTxs);
  +>>>>>>> 1.12
               SpyDestination dest = (SpyDestination)queues.get(key);
               if (dest != null)
               {
  @@ -551,7 +757,7 @@
            throw newE;
         }
      }
  -
  +   */
      /**
       *  #Description of the Method
       *
  @@ -574,7 +780,7 @@
            releaseTx(txId);
            releaseTxInfo(info);
         }
  -      checkCleanup(info.log);
  +      //checkCleanup(info.log);maybe only on rollover
      }
   
      protected org.jboss.mq.pm.Tx getTx(long value)
  @@ -631,7 +837,7 @@
            {
               --info.liveMessages;
            }
  -         checkCleanup(info.txLog);
  +         //checkCleanup(info.txLog);maybe only on rollover
         }
      }
   
  @@ -666,10 +872,11 @@
   
            for (Iterator it = queues.values().iterator(); it.hasNext(); )
            {
  -            SpyDestination dest = (SpyDestination)it.next();
  -            SpyMessageLog log = new SpyMessageLog(new File(dataDirFile, 
dest.toString() + ".dat" + numRollOvers));
  -            LogInfo logInfo = new LogInfo(log, dest, newTxLog);
  -            logs.put("" + dest, logInfo);
  +            SpyDestination spyDest = (SpyDestination)it.next();
  +            //String destName = spyDest.toString();
  +            // SpyMessageLog log = new SpyMessageLog(new File(dataDirFile, destName 
+ ".dat" + numRollOvers));
  +            //LogInfo logInfo = new LogInfo(log, dest, newTxLog);
  +            logs.put(spyDest.toString(), newQueueInfo(spyDest, newTxLog));
            }
            SpyTxLog oldLog = currentTxLog;
            synchronized (messageLogs)
  @@ -677,7 +884,7 @@
               currentTxLog = newTxLog;
               messageLogs.put(newTxLog, logs);
            }
  -         checkCleanup(oldLog);
  +         checkCleanup();//oldLog);
         }
         catch (Exception e)
         {
  @@ -687,9 +894,35 @@
         }
      }
   
  +   protected LogInfo newQueueInfo(SpyDestination spyDest, SpyTxLog txLog) throws 
JMSException
  +   {
  +      try 
  +      {
  +         String destName = spyDest.toString();
  +         SpyMessageLog log = new SpyMessageLog(messageCache, new File(dataDirFile, 
destName + ".dat" + numRollOvers));
  +         return new LogInfo(log, spyDest, txLog);
  +      } 
  +      catch (Exception e) 
  +      {
  +         JMSException jme = new SpyJMSException("Error rolling over log to new file 
for dest: " + spyDest);
  +         jme.setLinkedException(e);
  +         throw jme;
  +      } // end of try-catch
  +      
  +   }
  +
  +   protected void checkCleanup() throws JMSException
  +   {
  +      Iterator logs = new ArrayList(messageLogs.keySet()).iterator();
  +      while (logs.hasNext()) 
  +      {
  +         checkCleanup((SpyTxLog)logs.next());
  +      } // end of while ()
  +   }
  +
      protected void checkCleanup(SpyTxLog txLog) throws JMSException
      {
  -      if (txLog == currentTxLog)
  +      if (txLog == null || txLog == currentTxLog)
         {
            return;
         }
  
  
  
  1.5       +4 -3      
jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManagerMBean.java
  
  Index: PersistenceManagerMBean.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManagerMBean.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- PersistenceManagerMBean.java      2001/09/01 03:01:00     1.4
  +++ PersistenceManagerMBean.java      2001/11/10 21:38:05     1.5
  @@ -6,6 +6,7 @@
    */
   package org.jboss.mq.pm.rollinglogged;
   
  +import javax.management.ObjectName;
   import org.jboss.system.ServiceMBean;
   
   /**
  @@ -13,12 +14,11 @@
    *
    * @author     Vincent Sheffer ([EMAIL PROTECTED])
    * @see        <related>
  - * @version    $Revision: 1.4 $
  + * @version    $Revision: 1.5 $
    */
   public interface PersistenceManagerMBean
  -       extends ServiceMBean
  +   extends ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean
   {
  -   public final static String OBJECT_NAME = ":service=JBossMQ";
   
      /**
       *  Gets the DataDirectory attribute of the PersistenceManagerMBean object
  @@ -33,4 +33,5 @@
       * @param  newDataDirectory  The new DataDirectory value
       */
      public void setDataDirectory(java.lang.String newDataDirectory);
  +
   }
  
  
  
  1.6       +47 -16    
jbossmq/src/main/org/jboss/mq/pm/rollinglogged/SpyMessageLog.java
  
  Index: SpyMessageLog.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/SpyMessageLog.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- SpyMessageLog.java        2001/10/28 04:07:35     1.5
  +++ SpyMessageLog.java        2001/11/10 21:38:05     1.6
  @@ -6,12 +6,16 @@
    */
   package org.jboss.mq.pm.rollinglogged;
   
  +
  +import java.io.File;
   import java.io.IOException;
   import java.io.Serializable;
  -import java.io.File;
  +import java.util.HashMap;
   import javax.jms.JMSException;
   import org.jboss.mq.SpyJMSException;
  +import org.jboss.mq.server.JMSServer;
   import org.jboss.mq.server.MessageReference;
  +import org.jboss.mq.server.MessageCache;
   
   import org.jboss.mq.SpyMessage;
   
  @@ -20,9 +24,9 @@
    *  log can be used reconstruct the queue in case of provider failure. Integrety
    *  is kept by the use of an ObjectIntegrityLog.
    *
  - * @created    August 16, 2001
    * @author:    Hiram Chirino ([EMAIL PROTECTED])
  - * @version    $Revision: 1.5 $
  + * @author <a href="mailto:[EMAIL PROTECTED]";>David Jencks</a>
  + * @version    $Revision: 1.6 $
    */
   public class SpyMessageLog {
   
  @@ -31,11 +35,19 @@
      /////////////////////////////////////////////////////////////////////
      private IntegrityLog transactionLog;
   
  +   private final MessageCache cache;
  +
      /////////////////////////////////////////////////////////////////////
      // Constructor
      /////////////////////////////////////////////////////////////////////
  -   public SpyMessageLog( File file )
  +   SpyMessageLog(MessageCache cache, File file )
         throws JMSException {
  +      if (cache == null) 
  +      {
  +         throw new IllegalArgumentException("must supply a cache!");
  +      } // end of if ()
  +      
  +      this.cache = cache;
         try {
            transactionLog = new IntegrityLog( file );
         } catch ( IOException e ) {
  @@ -66,12 +78,16 @@
      }
   
   
  -   public synchronized MessageReference[] restore( java.util.TreeSet commited )
  +   public synchronized void restore( java.util.TreeSet committed, 
PersistenceManager.LogInfo info, HashMap messages)
         throws JMSException {
   
  -      java.util.HashMap messageIndex = new java.util.HashMap();
  -      org.jboss.mq.server.MessageCache cache = 
org.jboss.mq.server.JMSServer.getInstance().getMessageCache();
  +      //<<<<<<< SpyMessageLog.java
  +      //java.util.HashMap messageIndex = new java.util.HashMap();
  +           //info.liveMessages = 0;
  +      //=======
  +      //java.util.HashMap messageIndex = new java.util.HashMap();
         
  +      //>>>>>>> 1.5
         try {
            java.util.LinkedList objects = transactionLog.toIndex();
   
  @@ -83,31 +99,41 @@
                  IntegrityLog.MessageAddedRecord r = ( 
IntegrityLog.MessageAddedRecord )o;
                  r.message.header.messageId = r.messageId;
   
  -               if ( r.isTransacted && !commited.contains( new org.jboss.mq.pm.Tx( 
r.transactionId ) ) ) {
  +               if ( r.isTransacted && !committed.contains( new org.jboss.mq.pm.Tx( 
r.transactionId ) ) ) {
                     // the TX this message was part of was not
  -                  // commited... so drop this message
  +                  // committed... so drop this message
                     continue;
                  }
  -
                  MessageReference mr = cache.add(r.message);
  -               messageIndex.put( new Long( r.messageId ), mr );
  +               mr.persistData = info;
  +               messages.put(new Long(r.messageId), mr);
  +               info.liveMessages++;
   
  +
               } else if ( o instanceof IntegrityLog.MessageRemovedRecord ) {
   
                  IntegrityLog.MessageRemovedRecord r = ( 
IntegrityLog.MessageRemovedRecord )o;
   
  -               if ( r.isTransacted && !commited.contains( new org.jboss.mq.pm.Tx( 
r.transactionId ) ) ) {
  -                  // the TX this message was part of was not
  -                  // commited... so drop this message
  +               if ( r.isTransacted && !committed.contains( new org.jboss.mq.pm.Tx( 
r.transactionId ) ) ) {
  +                  // the TX this message read was part of was not
  +                  // committed... so keep this message
                     continue;
                  }
  +               //<<<<<<< SpyMessageLog.java
  +               MessageReference mr = (MessageReference)messages.remove(new 
Long(r.messageId));                  
  +               if( mr != null )
  +                  cache.remove(mr);   
  +               //messageIndex.remove( new Long( r.messageId ) );
  +               info.liveMessages--;
  +               /*=======
   
                  Long txid = new Long( r.messageId );  
                  MessageReference mr = (MessageReference)messageIndex.get(txid );
                  messageIndex.remove(txid );
                  if( mr != null )
                     cache.remove(mr);   
  -
  +               */
  +               //>>>>>>> 1.5
               }
            }
         } catch ( Exception e ) {
  @@ -115,12 +141,17 @@
            throwJMSException( "Could not rebuild the queue from the queue's 
tranaction log.", e );
         }
   
  +//<<<<<<< SpyMessageLog.java
  +      /*
  +      SpyMessage rc[] = new SpyMessage[messageIndex.size()];
  +=======
         MessageReference rc[] = new MessageReference[messageIndex.size()];
  +>>>>>>> 1.5
         java.util.Iterator iter = messageIndex.values().iterator();
         for ( int i = 0; iter.hasNext(); i++ ) {
            rc[i] = (MessageReference)iter.next();
         }
  -      return rc;
  +      return rc;*/
      }
   
      public synchronized void add( SpyMessage message, org.jboss.mq.pm.Tx 
transactionId )
  
  
  
  1.4       +4 -2      jbossmq/src/main/org/jboss/mq/pm/rollinglogged/SpyTxLog.java
  
  Index: SpyTxLog.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/SpyTxLog.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- SpyTxLog.java     2001/09/04 02:22:29     1.3
  +++ SpyTxLog.java     2001/11/10 21:38:05     1.4
  @@ -18,7 +18,7 @@
    *
    * @created    August 16, 2001
    * @author:    Hiram Chirino ([EMAIL PROTECTED])
  - * @version    $Revision: 1.3 $
  + * @version    $Revision: 1.4 $
    */
   public class SpyTxLog {
   
  @@ -32,13 +32,15 @@
      /////////////////////////////////////////////////////////////////////
      // Constructors
      /////////////////////////////////////////////////////////////////////
  -   public SpyTxLog( File file )
  +   SpyTxLog( File file )
         throws JMSException {
         try {
            transactionLog = new IntegrityLog( file );
         } catch ( IOException e ) {
            throwJMSException( "Could not open the queue's tranaction log: " + 
file.getAbsolutePath(), e );
         }
  +      System.out.println("created SpyTxLog: " + this);
  +      new Exception().printStackTrace();
      }
   
      /////////////////////////////////////////////////////////////////////
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to