User: starksm Date: 01/12/18 12:52:05 Modified: src/main/org/jboss/mq/pm/rollinglogged Tag: Branch_2_4 PersistenceManager.java Log: Added ability to set size at which the log rolls over via the RollOverSize attribute. Use org.jboss.logging.Logger for output instead of System.out/.err Revision Changes Path No revision No revision 1.3.2.2 +276 -133 jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java Index: PersistenceManager.java =================================================================== RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/rollinglogged/PersistenceManager.java,v retrieving revision 1.3.2.1 retrieving revision 1.3.2.2 diff -u -r1.3.2.1 -r1.3.2.2 --- PersistenceManager.java 2001/08/23 03:57:11 1.3.2.1 +++ PersistenceManager.java 2001/12/18 20:52:05 1.3.2.2 @@ -1,3 +1,9 @@ +/* + * JBossMQ, the OpenSource JMS implementation + * + * Distributable under LGPL license. + * See terms of license at gnu.org. + */ package org.jboss.mq.pm.rollinglogged; import java.net.URL; @@ -8,7 +14,7 @@ import java.util.TreeSet; import javax.jms.JMSException; -import javax.management.*; +import javax.management.ObjectName; import javax.naming.InitialContext; import org.jboss.mq.ConnectionToken; @@ -23,15 +29,19 @@ import org.jboss.util.ServiceMBeanSupport; +import org.jboss.logging.Logger; + /** * This class manages all persistence related services. * * @author David Maplesden ([EMAIL PROTECTED]) * @created August 16, 2001 - * @version $Revision: 1.3.2.1 $ + * @version $Revision: 1.3.2.2 $ */ -public class PersistenceManager extends org.jboss.util.ServiceMBeanSupport implements org.jboss.mq.pm.PersistenceManager, PersistenceManagerMBean { - +public class PersistenceManager + extends org.jboss.util.ServiceMBeanSupport + implements org.jboss.mq.pm.PersistenceManager, PersistenceManagerMBean +{ protected java.util.ArrayList listPool = new java.util.ArrayList(); protected java.util.ArrayList txPool = new java.util.ArrayList(); @@ -50,12 +60,13 @@ // The directory where persistence data should be stored URL dataDirURL; TxManager txManager; + + static Logger log = Logger.getLogger( PersistenceManager.class ); - private String dataDirectory; + private String dataDirectory; + private int rollOverSize = 1000; - public final static int ROLL_OVER_SIZE = 1000; public final static String TRANS_FILE_NAME = "transactions.dat"; - public final static boolean DEBUG = false; protected static int MAX_POOL_SIZE = 50; @@ -65,18 +76,19 @@ * @exception javax.jms.JMSException Description of Exception */ public PersistenceManager() - throws javax.jms.JMSException { + throws javax.jms.JMSException + { txManager = new TxManager( this ); } - /** * Insert the method's description here. Creation date: (6/27/2001 12:53:12 * AM) * * @param newDataDirectory java.lang.String */ - public void setDataDirectory( java.lang.String newDataDirectory ) { + public void setDataDirectory( String newDataDirectory ) + { dataDirectory = newDataDirectory; } @@ -86,50 +98,72 @@ * * @return java.lang.String */ - public java.lang.String getDataDirectory() { + public String getDataDirectory() + { return dataDirectory; } - public String getName() { - return "JBossMQ-PersistenceManager"; + public void setRollOverSize( int rollOverSize ) + { + this.rollOverSize = rollOverSize; } + public int getRollOverSize() + { + return rollOverSize; + } + + public String getName() + { + return "JBossMQ-RollingLogged-PersistenceManager"; + } + /** * getTxManager method comment. * * @return The TxManager value */ - public org.jboss.mq.pm.TxManager getTxManager() { + public org.jboss.mq.pm.TxManager getTxManager() + { return txManager; } public void initQueue( SpyDestination dest ) - throws javax.jms.JMSException { + throws javax.jms.JMSException + { String key = "" + dest; queues.put( key, dest ); SpyTxLog txLog = null; - if ( messageLogs == null ) { + if ( messageLogs == null ) + { return; } HashMap logs; - synchronized ( messageLogs ) { + synchronized ( messageLogs ) + { logs = ( HashMap )messageLogs.get( currentTxLog ); - if ( logs == null ) { + if ( logs == null ) + { logs = new HashMap(); messageLogs.put( currentTxLog, logs ); } - synchronized ( logs ) { + + synchronized ( logs ) + { LogInfo logInfo = ( LogInfo )logs.get( dest.toString() ); - if ( logInfo == null ) { - try { + if ( logInfo == null ) + { + try + { SpyMessageLog log = new SpyMessageLog( new URL( dataDirURL, dest.toString() + ".dat" + numRollOvers ).getFile() ); logInfo = new LogInfo( log, dest, currentTxLog ); logs.put( "" + dest, logInfo ); - } catch ( java.net.MalformedURLException e ) { + } catch ( java.net.MalformedURLException e ) + { JMSException jme = new SpyJMSException( "Error rolling over logs to new files." ); jme.setLinkedException( e ); throw jme; @@ -141,48 +175,61 @@ } public void destroyQueue( SpyDestination dest ) - throws javax.jms.JMSException { + throws javax.jms.JMSException + { - try { + try + { String key = "" + dest; queues.remove( key ); SpyMessageLog log = null; HashMap logs; - synchronized ( messageLogs ) { + synchronized ( messageLogs ) + { logs = ( HashMap )messageLogs.get( currentTxLog ); } - synchronized ( logs ) { + synchronized ( logs ) + { log = ( SpyMessageLog )logs.remove( key ); } - if ( log == null ) { + if ( log == null ) + { throw new SpyJMSException( "The persistence log was never initialized" ); } log.close(); log.delete(); HashSet deleteLogs = new HashSet(); - synchronized ( messageLogs ) { - for ( Iterator it = messageLogs.values().iterator(); it.hasNext(); ) { + synchronized ( messageLogs ) + { + for ( Iterator it = messageLogs.values().iterator(); it.hasNext(); ) + { logs = ( HashMap )it.next(); - synchronized ( logs ) { + synchronized ( logs ) + { log = ( SpyMessageLog )logs.remove( key ); } - if ( log != null ) { + if ( log != null ) + { deleteLogs.add( log ); } } } - for ( Iterator it = deleteLogs.iterator(); it.hasNext(); ) { + + for ( Iterator it = deleteLogs.iterator(); it.hasNext(); ) + { log = ( SpyMessageLog )it.next(); log.close(); log.delete(); } - } catch ( javax.jms.JMSException e ) { + } catch ( javax.jms.JMSException e ) + { throw e; - } catch ( Exception e ) { + } catch ( Exception e ) + { javax.jms.JMSException newE = new javax.jms.JMSException( "Invalid configuration." ); newE.setLinkedException( e ); throw newE; @@ -192,11 +239,9 @@ public void initService() - throws Exception { - - if ( DEBUG ) { - System.out.println( "Using new rolling logged persistence manager." ); - } + throws Exception + { + log.debug( "Using rolling logged persistence manager." ); URL configFile = getClass().getClassLoader().getResource( "jboss.jcml" ); dataDirURL = new URL( configFile, dataDirectory ); @@ -206,52 +251,69 @@ }, new String[]{ } ); server.setPersistenceManager( this ); - } public void startService() - throws Exception { + throws Exception + { JMSServer server = ( JMSServer )getServer().invoke( new ObjectName( org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME ), "getJMSServer", new Object[]{ }, new String[]{ } ); restore( server ); - } public void add( org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId ) - throws javax.jms.JMSException { - //System.out.println("Add message "+Long.toHexString(message.messageId)+" in trans "+Long.toHexString(txId.longValue())+" to "+message.getJMSDestination()); + throws javax.jms.JMSException + { + if( log.isTraceEnabled() ) + { + log.trace( "Add message " + Long.toHexString(message.messageId) + + "in trans " + Long.toHexString(txId.longValue()) + + " to " + message.getJMSDestination() ); + } + LogInfo logInfo; SpyTxLog txLog = null; - if ( txId == null ) { + if ( txId == null ) + { txLog = currentTxLog; - } else { - synchronized ( transToTxLogs ) { + } else + { + synchronized ( transToTxLogs ) + { txLog = ( ( TxInfo )transToTxLogs.get( txId ) ).log; } } HashMap logs; - synchronized ( messageLogs ) { + synchronized ( messageLogs ) + { logs = ( HashMap )messageLogs.get( txLog ); } - synchronized ( logs ) { + + synchronized ( logs ) + { logInfo = ( LogInfo )logs.get( message.getJMSDestination().toString() ); } - if ( logInfo == null ) { + if ( logInfo == null ) + { throw new javax.jms.JMSException( "Destination was not initalized with the PersistenceManager" ); } - synchronized ( logInfo ) { + synchronized ( logInfo ) + { logInfo.liveMessages++; message.persistData = logInfo; logInfo.log.add( message, txId ); } - if ( txId != null ) { - synchronized ( transToTxLogs ) { + + if ( txId != null ) + { + synchronized ( transToTxLogs ) + { TxInfo txInfo = ( TxInfo )transToTxLogs.get( txId ); txInfo.addMessages.add( message ); } @@ -260,17 +322,25 @@ } public void commitPersistentTx( org.jboss.mq.pm.Tx txId ) - throws javax.jms.JMSException { - //System.out.println("Committing TX "+Long.toHexString(txId.longValue())); + throws javax.jms.JMSException + { + if( log.isTraceEnabled() ) + { + log.trace( "Comitting TX " + Long.toHexString(txId.longValue()) ); + } + TxInfo info = null; LinkedList messagesToDelete = null; - synchronized ( transToTxLogs ) { + + synchronized ( transToTxLogs ) + { info = ( TxInfo )transToTxLogs.remove( txId ); messagesToDelete = info.ackMessages; } deleteMessages( messagesToDelete ); info.log.commitTx( txId ); - synchronized ( transToTxLogs ) { + synchronized ( transToTxLogs ) + { releaseTx( txId ); releaseTxInfo( info ); } @@ -278,7 +348,8 @@ } public org.jboss.mq.pm.Tx createPersistentTx() - throws javax.jms.JMSException { + throws javax.jms.JMSException + { org.jboss.mq.pm.Tx txId = null; SpyTxLog txLog = currentTxLog; synchronized ( transToTxLogs ) { @@ -290,32 +361,46 @@ } public void remove( org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId ) - throws javax.jms.JMSException { - //System.out.println("Removing message "+Long.toHexString(message.messageId)+" in trans "+Long.toHexString(txId.longValue())+" from "+message.getJMSDestination()); + throws javax.jms.JMSException + { + if( log.isTraceEnabled() ) + { + log.trace( "Removing message " + Long.toHexString(message.messageId) + + " in trans " + Long.toHexString( txId.longValue() ) + + " from " + message.getJMSDestination() ); + } LogInfo logInfo; SpyTxLog txLog = ( ( LogInfo )message.persistData ).txLog; - synchronized ( messageLogs ) { + synchronized ( messageLogs ) + { HashMap logs = ( HashMap )messageLogs.get( txLog ); logInfo = ( LogInfo )logs.get( message.getJMSDestination().toString() ); } - if ( logInfo == null ) { + if ( logInfo == null ) + { throw new javax.jms.JMSException( "Destination was not initalized with the PersistenceManager" ); } - synchronized ( logInfo.log ) { + synchronized ( logInfo.log ) + { logInfo.log.remove( message, txId ); } - if ( txId != null ) { - synchronized ( transToTxLogs ) { + + if ( txId != null ) + { + synchronized ( transToTxLogs ) + { TxInfo txInfo = ( TxInfo )transToTxLogs.get( txId ); txInfo.ackMessages.add( message ); } } - if ( txId == null ) { - synchronized ( logInfo ) { + if ( txId == null ) + { + synchronized ( logInfo ) + { --logInfo.liveMessages; } checkCleanup( txLog ); @@ -323,7 +408,8 @@ } public void restore( org.jboss.mq.server.JMSServer server ) - throws javax.jms.JMSException { + throws javax.jms.JMSException + { TreeSet commitedTxs = new TreeSet(); HashMap txLogs = new HashMap(); @@ -332,13 +418,17 @@ messageLogs = new HashMap(); - for ( int i = 0; i < dataFiles.length; ++i ) { + for ( int i = 0; i < dataFiles.length; ++i ) + { String name = dataFiles[i].getName(); - if ( name.startsWith( TRANS_FILE_NAME ) ) { + if ( name.startsWith( TRANS_FILE_NAME ) ) + { int index = name.indexOf( ".dat" ); - if ( index < 0 ) { + if ( index < 0 ) + { continue; } + String sRollOver = name.substring( index + 4 ); int rollOver = Integer.parseInt( sRollOver ); numRollOvers = Math.max( numRollOvers, rollOver + 1 ); @@ -349,15 +439,19 @@ } } - if ( !commitedTxs.isEmpty() ) { + if ( !commitedTxs.isEmpty() ) + { nextTxId = ( ( org.jboss.mq.pm.Tx )commitedTxs.last() ).longValue(); } - for ( int i = 0; i < dataFiles.length; ++i ) { + for ( int i = 0; i < dataFiles.length; ++i ) + { String name = dataFiles[i].getName(); - if ( !name.startsWith( TRANS_FILE_NAME ) ) { + if ( !name.startsWith( TRANS_FILE_NAME ) ) + { int index = name.indexOf( ".dat" ); - if ( index < 0 ) { + if ( index < 0 ) + { continue; } String sRollOver = name.substring( index + 4 ); @@ -367,15 +461,19 @@ SpyMessage[] messages = messageLog.restore( commitedTxs ); SpyTxLog txLog = ( SpyTxLog )txLogs.get( new Integer( rollOver ) ); SpyDestination dest = ( SpyDestination )queues.get( key ); - if ( dest != null ) { + if ( dest != null ) + { JMSDestination q = server.getJMSDestination( dest ); LogInfo info = new LogInfo( messageLog, dest, txLog ); info.liveMessages = messages.length; HashMap logs = ( HashMap )messageLogs.get( txLog ); logs.put( key, info ); + //TODO: make sure this lock is good enough - synchronized ( q ) { - for ( int j = 0; j < messages.length; j++ ) { + synchronized ( q ) + { + for ( int j = 0; j < messages.length; j++ ) + { messages[j].persistData = info; q.restoreMessage( messages[j] ); } @@ -384,30 +482,34 @@ } } - for ( Iterator it = txLogs.values().iterator(); it.hasNext(); ) { + for ( Iterator it = txLogs.values().iterator(); it.hasNext(); ) + { checkCleanup( ( SpyTxLog )it.next() ); } - try { - + try + { URL txLogFile = new URL( dataDirURL, TRANS_FILE_NAME + numRollOvers ); currentTxLog = new SpyTxLog( txLogFile.getFile() ); messageLogs.put( currentTxLog, new HashMap() ); - for ( Iterator it = queues.values().iterator(); it.hasNext(); ) { + for ( Iterator it = queues.values().iterator(); it.hasNext(); ) + { SpyDestination dest = ( SpyDestination )it.next(); String key = "" + dest; URL logFile = new URL( dataDirURL, dest.toString() + ".dat" + numRollOvers ); SpyMessageLog log = new SpyMessageLog( logFile.getFile() ); - synchronized ( messageLogs ) { + synchronized ( messageLogs ) + { LogInfo logInfo = new LogInfo( log, dest, currentTxLog ); HashMap logs = ( HashMap )messageLogs.get( currentTxLog ); logs.put( key, logInfo ); } } - } catch ( Exception e ) { + } catch ( Exception e ) + { javax.jms.JMSException newE = new javax.jms.JMSException( "Invalid configuration." ); newE.setLinkedException( e ); throw newE; @@ -415,58 +517,70 @@ } public void rollbackPersistentTx( org.jboss.mq.pm.Tx txId ) - throws javax.jms.JMSException { + throws javax.jms.JMSException + { TxInfo info = null; LinkedList messagesToDelete = null; - synchronized ( transToTxLogs ) { + synchronized ( transToTxLogs ) + { info = ( TxInfo )transToTxLogs.remove( txId ); messagesToDelete = info.addMessages; } deleteMessages( messagesToDelete ); info.log.rollbackTx( txId ); - synchronized ( transToTxLogs ) { + synchronized ( transToTxLogs ) + { releaseTx( txId ); releaseTxInfo( info ); } checkCleanup( info.log ); } - protected org.jboss.mq.pm.Tx getTx( long value ) { - if ( txPool.isEmpty() ) { + protected org.jboss.mq.pm.Tx getTx( long value ) + { + if ( txPool.isEmpty() ) + { return new org.jboss.mq.pm.Tx( value ); - } else { + } else + { org.jboss.mq.pm.Tx tx = ( org.jboss.mq.pm.Tx )txPool.remove( listPool.size() - 1 ); tx.setValue( value ); return tx; } } - protected TxInfo getTxInfo( org.jboss.mq.pm.Tx txId, SpyTxLog txLog ) { - if ( listPool.isEmpty() ) { + protected TxInfo getTxInfo( org.jboss.mq.pm.Tx txId, SpyTxLog txLog ) + { + if ( listPool.isEmpty() ) + { return new TxInfo( txId, txLog ); - } else { + } else + { TxInfo info = ( TxInfo )listPool.remove( listPool.size() - 1 ); info.txId = txId; info.log = txLog; return info; } } - - protected void releaseTxInfo( TxInfo list ) { - if ( listPool.size() < MAX_POOL_SIZE ) { + protected void releaseTxInfo( TxInfo list ) + { + if ( listPool.size() < MAX_POOL_SIZE ) + { list.ackMessages.clear(); list.addMessages.clear(); listPool.add( list ); } } - protected void deleteMessages( LinkedList messages ) - throws javax.jms.JMSException { - for ( Iterator it = messages.iterator(); it.hasNext(); ) { + throws javax.jms.JMSException + { + for ( Iterator it = messages.iterator(); it.hasNext(); ) + { LogInfo info = ( ( LogInfo )( ( SpyMessage )it.next() ).persistData ); - synchronized ( info ) { + synchronized ( info ) + { --info.liveMessages; } checkCleanup( info.txLog ); @@ -474,15 +588,21 @@ } protected void checkRollOver() - throws JMSException { - synchronized ( queues ) { + throws JMSException + { + synchronized ( queues ) + { int max = queues.size(); - if ( max == 0 ) { - max = ROLL_OVER_SIZE; - } else { - max *= ROLL_OVER_SIZE; + if ( max == 0 ) + { + max = rollOverSize; + } else + { + max *= rollOverSize; } - if ( ++messageCounter > max ) { + + if ( ++messageCounter > max ) + { messageCounter = 0; rollOverLogs(); } @@ -490,25 +610,31 @@ } protected void rollOverLogs() - throws JMSException { - try { + throws JMSException + { + try + { HashMap logs = new HashMap(); ++numRollOvers; SpyTxLog newTxLog = new SpyTxLog( new URL( dataDirURL, TRANS_FILE_NAME + numRollOvers ).getFile() ); - for ( Iterator it = queues.values().iterator(); it.hasNext(); ) { + for ( Iterator it = queues.values().iterator(); it.hasNext(); ) + { SpyDestination dest = ( SpyDestination )it.next(); SpyMessageLog log = new SpyMessageLog( new URL( dataDirURL, dest.toString() + ".dat" + numRollOvers ).getFile() ); LogInfo logInfo = new LogInfo( log, dest, newTxLog ); logs.put( "" + dest, logInfo ); } + SpyTxLog oldLog = currentTxLog; - synchronized ( messageLogs ) { + synchronized ( messageLogs ) + { currentTxLog = newTxLog; messageLogs.put( newTxLog, logs ); } checkCleanup( oldLog ); - } catch ( java.net.MalformedURLException e ) { + } catch ( java.net.MalformedURLException e ) + { JMSException jme = new SpyJMSException( "Error rolling over logs to new files." ); jme.setLinkedException( e ); throw jme; @@ -516,49 +642,62 @@ } protected void checkCleanup( SpyTxLog txLog ) - throws JMSException { - if ( txLog == currentTxLog ) { + throws JMSException + { + if ( txLog == currentTxLog ) + { return; } HashMap logs; - synchronized ( messageLogs ) { + synchronized ( messageLogs ) + { logs = ( HashMap )messageLogs.get( txLog ); } - synchronized ( logs ) { + synchronized ( logs ) + { //if no live messages and no live transactions then cleanup - for ( Iterator it = logs.values().iterator(); it.hasNext(); ) { + for ( Iterator it = logs.values().iterator(); it.hasNext(); ) + { LogInfo info = ( LogInfo )it.next(); - synchronized ( info ) { - if ( info.liveMessages != 0 ) { + synchronized ( info ) + { + if ( info.liveMessages != 0 ) + { return; } } } } - if ( !txLog.completed() ) { + + if ( !txLog.completed() ) + { return; - } - if ( DEBUG ) { - System.out.println( "Cleaning up" ); } + + log.debug( "Cleanign up" ); //close and delete all logs, remove data from data structures. - synchronized ( messageLogs ) { + synchronized ( messageLogs ) + { logs = ( HashMap )messageLogs.remove( txLog ); } - if ( logs == null ) { + if ( logs == null ) + { return; } txLog.close(); txLog.delete(); - for ( Iterator it = logs.values().iterator(); it.hasNext(); ) { + for ( Iterator it = logs.values().iterator(); it.hasNext(); ) + { LogInfo info = ( LogInfo )it.next(); info.log.close(); info.log.delete(); } } - protected void releaseTx( org.jboss.mq.pm.Tx tx ) { - if ( txPool.size() < MAX_POOL_SIZE ) { + protected void releaseTx( org.jboss.mq.pm.Tx tx ) + { + if ( txPool.size() < MAX_POOL_SIZE ) + { txPool.add( tx ); } } @@ -566,32 +705,36 @@ /** * @created August 16, 2001 */ - static class LogInfo { + static class LogInfo + { SpyMessageLog log; SpyDestination destination; int liveMessages = 0; SpyTxLog txLog; - LogInfo( SpyMessageLog log, SpyDestination destination, SpyTxLog txLog ) { + LogInfo( SpyMessageLog log, SpyDestination destination, SpyTxLog txLog ) + { this.log = log; this.destination = destination; this.txLog = txLog; } - } /** * @created August 16, 2001 */ - static class TxInfo { + static class TxInfo + { org.jboss.mq.pm.Tx txId; LinkedList addMessages = new LinkedList(); LinkedList ackMessages = new LinkedList(); SpyTxLog log; - TxInfo( org.jboss.mq.pm.Tx txId, SpyTxLog log ) { + TxInfo( org.jboss.mq.pm.Tx txId, SpyTxLog log ) + { this.txId = txId; this.log = log; } } + }
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development