User: d_jencks Date: 01/11/10 13:38:05 Modified: src/main/org/jboss/mq/pm/file MessageLog.java PersistenceManager.java PersistenceManagerMBean.java Log: Changed mbean dependencies to work directly by mbean-references: eliminated depends tag from *service.xml files Revision Changes Path 1.6 +27 -13 jbossmq/src/main/org/jboss/mq/pm/file/MessageLog.java Index: MessageLog.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/MessageLog.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- MessageLog.java 2001/10/28 04:07:34 1.5 +++ MessageLog.java 2001/11/10 21:38:04 1.6 @@ -5,20 +5,23 @@ * See terms of license at gnu.org. */ package org.jboss.mq.pm.file; + + + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; - import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; - +import java.util.Map; +import java.util.TreeMap; import javax.jms.JMSException; -import org.jboss.mq.server.MessageReference ; -import org.jboss.mq.server.JMSServer; - import org.jboss.mq.*; +import org.jboss.mq.server.JMSServer; +import org.jboss.mq.server.MessageCache; +import org.jboss.mq.server.MessageReference ; /** * This is used to keep SpyMessages on the disk and is used reconstruct the @@ -26,7 +29,7 @@ * * @created August 16, 2001 * @author: Paul Kendall ([EMAIL PROTECTED]) - * @version $Revision: 1.5 $ + * @version $Revision: 1.6 $ */ public class MessageLog { @@ -35,6 +38,8 @@ ///////////////////////////////////////////////////////////////////// private File queueName; + private MessageCache messageCache; + ///////////////////////////////////////////////////////////////////// // Constants ///////////////////////////////////////////////////////////////////// @@ -49,8 +54,15 @@ ///////////////////////////////////////////////////////////////////// // Constructor ///////////////////////////////////////////////////////////////////// - public MessageLog( File file ) - throws JMSException { + public MessageLog(MessageCache messageCache, File file ) + + { + if (messageCache == null) + { + throw new IllegalArgumentException("Need a MessageCache to construct a MessageLog!"); + } // end of if () + + this.messageCache = messageCache; queueName = file; queueName.mkdirs(); } @@ -63,10 +75,10 @@ } - public MessageReference[] restore( java.util.TreeSet rollBackTXs ) + public Map restore( java.util.TreeSet rollBackTXs ) throws JMSException { //use sorted map to get queue order right. - java.util.TreeMap messageIndex = new java.util.TreeMap(); + TreeMap messageIndex = new TreeMap(); try { File[] files = queueName.listFiles(); @@ -89,13 +101,15 @@ } catch ( Exception e ) { throwJMSException( "Could not rebuild the queue from the queue's tranaction log.", e ); } - + return messageIndex; + /* MessageReference rc[] = new MessageReference[messageIndex.size()]; java.util.Iterator iter = messageIndex.values().iterator(); for ( int i = 0; iter.hasNext(); i++ ) { rc[i] = ( MessageReference )iter.next(); } return rc; + */ } public void add( MessageReference messageRef, org.jboss.mq.pm.Tx transactionId ) @@ -206,7 +220,7 @@ out.close(); } - protected void restoreMessageFromFile( java.util.TreeMap store, File file ) + protected void restoreMessageFromFile(TreeMap store, File file ) throws Exception { ObjectInputStream in = new ObjectInputStream( new FileInputStream( file ) ); long msgId = in.readLong(); @@ -238,7 +252,7 @@ in.close(); message.header.messageId = msgId; - MessageReference mr = JMSServer.getInstance().getMessageCache().add(message); + MessageReference mr = messageCache.add(message); mr.persistData = file; store.put( new Long( msgId ), mr ); } 1.9 +243 -23 jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManager.java Index: PersistenceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManager.java,v retrieving revision 1.8 retrieving revision 1.9 diff -u -r1.8 -r1.9 --- PersistenceManager.java 2001/10/28 04:07:34 1.8 +++ PersistenceManager.java 2001/11/10 21:38:04 1.9 @@ -5,44 +5,55 @@ * See terms of license at gnu.org. */ package org.jboss.mq.pm.file; + + + + + + + + + import java.io.File; import java.io.IOException; - import java.net.URL; +import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; +import java.util.Map; import java.util.TreeSet; - import javax.jms.JMSException; - - - - - import javax.management.*; - import org.jboss.mq.SpyDestination; import org.jboss.mq.SpyJMSException; import org.jboss.mq.SpyMessage; +import org.jboss.mq.SpyMessage; import org.jboss.mq.pm.TxManager; import org.jboss.mq.server.JMSDestination; +import org.jboss.mq.server.JMSQueue; import org.jboss.mq.server.JMSServer; -import org.jboss.system.ServiceMBeanSupport; - -import org.jboss.mq.SpyMessage; +import org.jboss.mq.server.JMSTopic; +import org.jboss.mq.server.PersistentQueue; import org.jboss.mq.server.MessageReference; +import org.jboss.mq.server.MessageCache; +import org.jboss.system.ServiceMBeanSupport; /** * This class manages all persistence related services for file based * persistence. * * @author Paul Kendall ([EMAIL PROTECTED]) - * @version $Revision: 1.8 $ + * @version $Revision: 1.9 $ */ public class PersistenceManager extends ServiceMBeanSupport implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager { protected final static int MAX_POOL_SIZE = 50; + + + private ObjectName messageCacheName; + private MessageCache messageCache; + protected java.util.ArrayList txPool = new java.util.ArrayList(); protected long tidcounter = Long.MIN_VALUE; @@ -56,6 +67,8 @@ HashMap messageLogs = new HashMap(); // Maps (Long)txIds to LinkedList of AddFile tasks HashMap transactedTasks = new HashMap(); + //Holds unrestored messages read from queues, indexed by queue name. + Map unrestoredMessages = new HashMap(); /** * PersistenceManager constructor. @@ -67,6 +80,26 @@ txManager = new TxManager(this); } + public org.jboss.mq.pm.PersistenceManager getInstance() + { + return this; + } + + public ObjectName getMessageCache() + { + return messageCacheName; + } + + public void setMessageCache(ObjectName messageCache) + { + this.messageCacheName = messageCache; + } + + public MessageCache getMessageCacheInstance() + { + return messageCache; + } + /** * Sets the DataDirectory attribute of the PersistenceManager object * @@ -107,32 +140,197 @@ return txManager; } - /** * #Description of the Method * * @exception Exception Description of Exception */ - public void initService() throws Exception + public void startService() throws Exception { File jbossHome = new File(System.getProperty("jboss.system.home")); dataDirFile = new File(jbossHome, dataDirectory); dataDirFile.mkdirs(); if( !dataDirFile.isDirectory() ) throw new Exception("The data directory is not valid: "+dataDirFile.getCanonicalPath()); - JMSServer server = (JMSServer)getServer().invoke(new ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[]{}, new String[]{}); - server.setPersistenceManager(this); + messageCache = (MessageCache)getServer().invoke(messageCacheName, "getInstance", new Object[] {}, new String[] {}); + + restoreTransactions(); + } /** - * #Description of the Method + * The <code>restoreTransactions</code> method is called when the + * PersistenceManager service is started. It reads all transaction log + * files, and pre-restores all messages that are committed and not read. + * When a queue or topic is started, it will collect these pre-restored + * messages and add them to its in memory queue. * - * @exception Exception Description of Exception + * @exception javax.jms.JMSException if an error occurs */ - public void startService() throws Exception + private void restoreTransactions() throws javax.jms.JMSException + { + //reconstruct TXs + TreeSet txs = new TreeSet(); + File[] transactFiles = dataDirFile.listFiles(); + if(transactFiles != null) + { + for (int i = 0; i < transactFiles.length; i++) + { + // Set up a messageLog for each queue data directory. + if( transactFiles[i].isDirectory() ) + { + String dirName = transactFiles[i].toString(); + int start = transactFiles[i].getParent().length(); + String key = dirName.substring(start); + MessageLog log = new MessageLog(messageCache, transactFiles[i]); + LogInfo info = new LogInfo(log, null); + synchronized (messageLogs) + { + messageLogs.put(key, info); + } + transactFiles[i] = null; + continue; + } + + try + { + Long tx = new Long(Long.parseLong(transactFiles[i].getName())); + java.util.ArrayList removingMessages = readTxFile(transactFiles[i]); + if (testRollBackTx(tx, removingMessages)) + { + txs.add(tx); + } + } + catch (NumberFormatException e) + { + log.warn("Ignoring invalid transaction record file " + transactFiles[i].getAbsolutePath()); + transactFiles[i] = null; + } + catch (IOException e) + { + JMSException jmse = new SpyJMSException("IO Error when restoring."); + jmse.setLinkedException(e); + throw jmse; + } + } + } + if (!txs.isEmpty()) + { + this.tidcounter = ((Long)txs.last()).longValue() + 1; + } + + HashMap clone; + synchronized (messageLogs) + { + clone = (HashMap)messageLogs.clone(); + } + + for (Iterator i = clone.keySet().iterator(); i.hasNext();) + { + Object key = i.next(); + LogInfo logInfo = (LogInfo)clone.get(key); + unrestoredMessages.put(key, logInfo.log.restore(txs)); + } + + //all txs now committed or rolled back so delete tx files + if(transactFiles != null) + { + for (int i = 0; i < transactFiles.length; i++) + { + if (transactFiles[i] != null) + { + deleteTxFile(transactFiles[i]); + } + } + } + + } + + /** + * The <code>restoreDestination</code> method is called by a queue or + * topic on startup. The method sends all the pre-restored messages to + * the JMSDestination to get them back into the in-memory queue. + * + * @param jmsDest a <code>JMSDestination</code> value + * @exception javax.jms.JMSException if an error occurs + */ + public void restoreDestination(JMSDestination jmsDest) throws javax.jms.JMSException + { + if (jmsDest instanceof JMSQueue) + { + SpyDestination spyDest = jmsDest.getSpyDestination(); + restoreQueue(jmsDest, spyDest); + } // end of if () + else if (jmsDest instanceof JMSTopic) + { + Collection persistQList = ((JMSTopic)jmsDest).getPersistentQueues(); + Iterator pq = persistQList.iterator(); + while (pq.hasNext()) + { + SpyDestination spyDest = ((PersistentQueue)pq.next()).getSpyDestination(); + + restoreQueue(jmsDest, spyDest); + + } // end of while () + + } // end of if () + + } + + /** + * The <code>restoreQueue</code> method restores the messages for + * a SpyDestination to its queue by sending them to the associated + * JMSDestination. + * + * @param jmsDest a <code>JMSDestination</code> value + * @param dest a <code>SpyDestination</code> value + * @exception JMSException if an error occurs + */ + public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) + throws JMSException { - JMSServer server = (JMSServer)getServer().invoke(new ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new Object[]{}, new String[]{}); - restore(server); + + //remember this queue + String queueName = dest.toString(); + LogInfo info = (LogInfo)messageLogs.get(queueName); + if (info == null) + { + //must be new, set up directory etc. + File logDir = new File(dataDirFile, queueName); + MessageLog log = new MessageLog(messageCache, logDir); + info = new LogInfo(log, dest); + synchronized (messageLogs) + { + messageLogs.put(queueName, info); + } + + } // end of if () + else + { + info.destination = dest; + } // end of else + + //restore the messages from old logs (previously read into unrestoredMessages) + Map messages = (Map)unrestoredMessages.remove(queueName); + if (messages != null) + { + synchronized (jmsDest) + { + Iterator m = messages.values().iterator(); + while (m.hasNext()) + { + MessageReference message = (MessageReference)m.next(); + if (dest instanceof org.jboss.mq.SpyTopic) + { + SpyMessage sm = message.getMessage(); + sm.header.durableSubscriberID = ((org.jboss.mq.SpyTopic)dest).getDurableSubscriptionID(); + message.invalidate(); // since we did an update. + } + jmsDest.restoreMessage(message); + } // end of while () + + } + } // end of if () } /** @@ -141,6 +339,7 @@ * @param server Description of Parameter * @exception javax.jms.JMSException Description of Exception */ + /* public void restore(JMSServer server) throws javax.jms.JMSException { //reconstruct TXs @@ -223,19 +422,40 @@ } } } + */ + public void initQueue(SpyDestination dest) throws javax.jms.JMSException + { + try + { + File logDir = new File(dataDirFile, dest.toString()); + MessageLog log = new MessageLog(messageCache, logDir); + LogInfo info = new LogInfo(log, dest); + synchronized (messageLogs) + { + messageLogs.put(dest.toString(), info); + } + } + catch (Exception e) + { + javax.jms.JMSException newE = new javax.jms.JMSException("Invalid configuration."); + newE.setLinkedException(e); + throw newE; + } + } /** * #Description of the Method * * @param dest Description of Parameter * @exception javax.jms.JMSException Description of Exception */ + /* public void initQueue(SpyDestination dest) throws javax.jms.JMSException { try { File logDir = new File(dataDirFile, dest.toString()); - MessageLog log = new MessageLog(logDir); + MessageLog log = new MessageLog(messageCache, logDir); LogInfo info = new LogInfo(log, dest); synchronized (messageLogs) { @@ -253,7 +473,7 @@ throw newE; } } - + */ /** * #Description of the Method * 1.5 +3 -2 jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManagerMBean.java Index: PersistenceManagerMBean.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/file/PersistenceManagerMBean.java,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- PersistenceManagerMBean.java 2001/09/01 03:00:59 1.4 +++ PersistenceManagerMBean.java 2001/11/10 21:38:05 1.5 @@ -7,16 +7,17 @@ package org.jboss.mq.pm.file; import org.jboss.system.ServiceMBean; +import javax.management.ObjectName; /** * <description>MBean interface for the JBossMQ JMX service. * * @author Vincent Sheffer ([EMAIL PROTECTED]) * @see <related> - * @version $Revision: 1.4 $ + * @version $Revision: 1.5 $ */ public interface PersistenceManagerMBean - extends ServiceMBean + extends ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean { /** * Gets the DataDirectory attribute of the PersistenceManagerMBean object
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development