User: chirino Date: 02/03/27 19:08:04 Added: src/main/org/jboss/mq/pm/jdbc2 PersistenceManager.java PersistenceManagerMBean.java Log: Adding a new JDBC PersistenceManager/CacheStore. Better than the original: - does not need the NoTransDS anymore. - All SQL statements are configurable - Supports restoring the same queue multiple times. (for when the queue is deployed<->undeployed) - Easier on memory usage. - CacheManager does not store messages that are alleady stored in the DB. All that's left is more testing! Revision Changes Path 1.1 jbossmq/src/main/org/jboss/mq/pm/jdbc2/PersistenceManager.java Index: PersistenceManager.java =================================================================== /* * JBossMQ, the OpenSource JMS implementation * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.mq.pm.jdbc2; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.StreamCorruptedException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; import java.util.Iterator; import java.util.LinkedList; import java.util.Properties; import javax.jms.JMSException; import javax.management.ObjectName; import javax.naming.InitialContext; import javax.sql.DataSource; import javax.transaction.Status; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import org.jboss.mq.SpyDestination; import org.jboss.mq.SpyJMSException; import org.jboss.mq.SpyMessage; import org.jboss.mq.pm.TxManager; import org.jboss.mq.server.JMSDestination; import org.jboss.mq.server.MessageCache; import org.jboss.mq.server.MessageReference; import org.jboss.system.ServiceMBeanSupport; import org.jboss.tm.TransactionManagerService; /** * This class manages all persistence related services for JDBC based * persistence. * * @author: Jayesh Parayali ([EMAIL PROTECTED]) * @author: Hiram Chirino ([EMAIL PROTECTED]) * * @version $Revision: 1.1 $ */ public class PersistenceManager extends ServiceMBeanSupport implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, org.jboss.mq.pm.CacheStore { ///////////////////////////////////////////////////////////////////////////////// // // TX state attibutes // ///////////////////////////////////////////////////////////////////////////////// private long nextTransactionId = 0; private TxManager txManager; private DataSource datasource; private MessageCache messageCache; private TransactionManager tm; ///////////////////////////////////////////////////////////////////////////////// // // JDBC Access Attributes // ///////////////////////////////////////////////////////////////////////////////// String SELECT_ALL_UNCOMMITED_TXS = "SELECT TXID FROM JMS_TRANSACTIONS"; String DELETE_ALL_MESSAGE_WITH_TX = "DELETE FROM JMS_MESSAGES WHERE TXID=?"; String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?"; String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?"; String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)"; String SELECT_MAX_TX = "SELECT MAX(TXID) FROM JMS_MESSAGES"; String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?"; String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?"; String INSERT_MESSAGE = "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)"; String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET (TXID, TXOP) VALUES(?,?) WHERE MESSAGEID=? AND DESTINATION=?"; String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?"; String CREATE_MESSAGE_TABLE = "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, " + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1)," + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )"; String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER )"; static final int OBJECT_BLOB = 0; static final int BYTES_BLOB = 1; static final int BINARYSTREAM_BLOB = 2; static final int BLOB_BLOB = 3; int blobType = OBJECT_BLOB; boolean createTables; ///////////////////////////////////////////////////////////////////////////////// // // Constructor. // ///////////////////////////////////////////////////////////////////////////////// public PersistenceManager() throws javax.jms.JMSException { txManager = new TxManager(this); } /** * This inner class helps handle the tx management of the jdbc connections. * */ class TransactionManagerStrategy { Transaction threadTx; Transaction newTransaction; void startTX() throws JMSException { //log.debug("starting a new TM transaction"); try { // Thread arriving must be clean (jboss doesn't set the thread // previously). However optimized calls come with associated // thread for example. We suspend the thread association here, and // resume in the finally block of the following try. threadTx = tm.suspend(); // Always begin a transaction tm.begin(); // get it newTransaction = tm.getTransaction(); } catch (Exception e) { try { if (threadTx != null) tm.resume(threadTx); } catch (Exception ignore) { } throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); } } void setRollbackOnly() throws JMSException { //log.debug("rolling back a TM transaction"); try { newTransaction.setRollbackOnly(); } catch (Exception e) { throw new SpyJMSException("Could not start a mark the transaction for rollback .", e); } } void endTX() throws JMSException { //log.debug("ending TM transaction."); try { if (newTransaction.getStatus() == Status.STATUS_MARKED_ROLLBACK) { newTransaction.rollback(); } else { newTransaction.commit(); } } catch (Exception e) { throw new SpyJMSException("Could not start a transaction with the transaction manager.", e); } finally { try { if (threadTx != null) tm.resume(threadTx); } catch (Exception ignore) { } } } } ///////////////////////////////////////////////////////////////////////////////// // // TX Resolution. // ///////////////////////////////////////////////////////////////////////////////// synchronized public void resolveAllUncommitedTXs() throws JMSException { TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; ResultSet rs = null; try { c = datasource.getConnection(); if (createTables) { try { stmt = c.prepareStatement(CREATE_MESSAGE_TABLE); stmt.executeUpdate(); stmt.close(); } catch (SQLException e) { log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE + ", got : " + e); } try { stmt = c.prepareStatement(CREATE_TX_TABLE); stmt.executeUpdate(); stmt.close(); } catch (SQLException e) { log.debug("Could not create table with SQL: " + CREATE_TX_TABLE + ", got : " + e); } } // Delete all the messages that were added but thier tx's were not commited. stmt = c.prepareStatement(DELETE_ALL_MESSAGE_WITH_TX); Collection unresolvedTXs = findAllUncommitedTXs(c); Iterator i = unresolvedTXs.iterator(); while (i.hasNext()) { long txid = ((Long) i.next()).longValue(); stmt.setLong(1, txid); stmt.executeUpdate(); } stmt.close(); // Delete all the non persistent messages that were added by the // CacheStore interface of this PM removeMarkedMessages(c, null, "T"); // Find out what the next TXID should be stmt = c.prepareStatement(SELECT_MAX_TX); rs = stmt.executeQuery(); if (rs.next()) { nextTransactionId = rs.getLong(1) + 1; } } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e); } finally { try { rs.close(); } catch (Throwable ignore) { } try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } public Collection findAllUncommitedTXs(Connection con) throws SQLException { LinkedList items = new LinkedList(); PreparedStatement stmt = null; ResultSet rs = null; try { stmt = con.prepareStatement(SELECT_ALL_UNCOMMITED_TXS); rs = stmt.executeQuery(); while (rs.next()) { long id = rs.getLong(1); items.add(new Long(id)); } } finally { try { rs.close(); } catch (Throwable ignore) { } try { stmt.close(); } catch (Throwable ignore) { } } return items; } ///////////////////////////////////////////////////////////////////////////////// // // Message Recovery // ///////////////////////////////////////////////////////////////////////////////// synchronized public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws javax.jms.JMSException { if (jmsDest == null) throw new IllegalArgumentException("Must supply non null JMSDestination to restoreQueue"); if (dest == null) throw new IllegalArgumentException("Must supply non null SpyDestination to restoreQueue"); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; ResultSet rs = null; try { c = datasource.getConnection(); stmt = c.prepareStatement(SELECT_MESSAGES_IN_DEST); stmt.setString(1, dest.getName()); rs = stmt.executeQuery(); while (rs.next()) { SpyMessage message = extractMessage(rs); MessageReference mr = messageCache.add(message); mr.setStored(((SpyDestination) message.getJMSDestination()).getName()); jmsDest.restoreMessage(mr); } } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not restore messages to destination : " + dest.getName(), e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not restore messages to destination : " + dest.getName(), e); } finally { try { rs.close(); } catch (Throwable ignore) { } try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } SpyMessage extractMessage(ResultSet rs) throws SQLException, IOException { try { long messageid = rs.getLong(1); SpyMessage message = null; if (blobType == OBJECT_BLOB) { message = (SpyMessage) rs.getObject(2); } else if (blobType == BYTES_BLOB) { byte[] st = rs.getBytes(2); ByteArrayInputStream baip = new ByteArrayInputStream(st); ObjectInputStream ois = new ObjectInputStream(baip); message = (SpyMessage) ois.readObject(); } else if (blobType == BINARYSTREAM_BLOB) { ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(2)); message = (SpyMessage) ois.readObject(); } else if (blobType == BLOB_BLOB) { ObjectInputStream ois = new ObjectInputStream(rs.getBlob(2).getBinaryStream()); message = (SpyMessage) ois.readObject(); } message.header.messageId = messageid; return message; } catch (ClassNotFoundException e) { throw new IOException("Could not load the message: " + e); } catch (StreamCorruptedException e) { throw new IOException("Could not load the message: " + e); } } ///////////////////////////////////////////////////////////////////////////////// // // TX Commit // ///////////////////////////////////////////////////////////////////////////////// public void commitPersistentTx(org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException { TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; try { c = datasource.getConnection(); removeMarkedMessages(c, txId, "D"); removeTXRecord(c, txId.longValue()); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not commit tx: " + txId, e); } finally { try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } public void removeMarkedMessages(Connection c, org.jboss.mq.pm.Tx txid, String mark) throws SQLException { PreparedStatement stmt = null; try { stmt = c.prepareStatement(DELETE_MARKED_MESSAGES); if (txid != null) stmt.setLong(1, txid.longValue()); else stmt.setObject(1, null); stmt.setString(2, mark); stmt.executeUpdate(); } finally { try { stmt.close(); } catch (Throwable e) { } } } public void removeTXRecord(Connection c, long txid) throws SQLException { PreparedStatement stmt = null; try { stmt = c.prepareStatement(DELETE_TX); stmt.setLong(1, txid); stmt.executeUpdate(); } finally { try { stmt.close(); } catch (Throwable e) { } } } ///////////////////////////////////////////////////////////////////////////////// // // TX Rollback // ///////////////////////////////////////////////////////////////////////////////// public void rollbackPersistentTx(org.jboss.mq.pm.Tx txId) throws JMSException { TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; try { c = datasource.getConnection(); removeMarkedMessages(c, txId, "A"); removeTXRecord(c, txId.longValue()); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not rollback tx: " + txId, e); } finally { try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } ///////////////////////////////////////////////////////////////////////////////// // // TX Creation // ///////////////////////////////////////////////////////////////////////////////// public org.jboss.mq.pm.Tx createPersistentTx() throws JMSException { org.jboss.mq.pm.Tx id = new org.jboss.mq.pm.Tx(nextTransactionId++); TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; try { c = datasource.getConnection(); stmt = c.prepareStatement(INSERT_TX); stmt.setLong(1, id.longValue()); stmt.executeUpdate(); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not crate tx: " + id, e); } finally { try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } return id; } ///////////////////////////////////////////////////////////////////////////////// // // Adding a message // ///////////////////////////////////////////////////////////////////////////////// public void add(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException { // LogInfo logInfo; TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; try { // has it allready been stored by the message cache interface?? if (messageRef.isStored) { //update the stored record, markMessage(c, messageRef.messageId, (String) messageRef.persistData, txId, "A"); } else { SpyMessage message = messageRef.getMessage(); c = datasource.getConnection(); add(c, message, txId, "A"); messageRef.setStored(((SpyDestination) message.getJMSDestination()).getName()); } } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not store message: " + messageRef.messageId, e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not store message: " + messageRef.messageId, e); } finally { try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } public void add(Connection c, SpyMessage message, org.jboss.mq.pm.Tx txId, String mark) throws SQLException, IOException { PreparedStatement stmt = null; try { stmt = c.prepareStatement(INSERT_MESSAGE); stmt.setLong(1, message.header.messageId); stmt.setString(2, ((SpyDestination) message.getJMSDestination()).getName()); if (blobType == OBJECT_BLOB) { stmt.setObject(3, message); } else if (blobType == BYTES_BLOB) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(message); byte[] messageAsBytes = baos.toByteArray(); stmt.setBytes(3, messageAsBytes); } else if (blobType == BINARYSTREAM_BLOB) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(baos); oos.writeObject(message); byte[] messageAsBytes = baos.toByteArray(); ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes); stmt.setBinaryStream(3, bais, messageAsBytes.length); } else if (blobType == BLOB_BLOB) { /** TODO: ByteArrayOutputStream baos= new ByteArrayOutputStream(); ObjectOutputStream oos= new ObjectOutputStream(baos); oos.writeObject(message); byte[] messageAsBytes= baos.toByteArray(); ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes); stmt.setBsetBinaryStream(3, bais, messageAsBytes.length); */ } if (txId != null) stmt.setLong(4, txId.longValue()); else stmt.setObject(4, null); stmt.setString(5, mark); stmt.executeUpdate(); } finally { try { stmt.close(); } catch (Throwable ignore) { } } } public void markMessage(Connection c, long messageid, String destination, org.jboss.mq.pm.Tx txId, String mark) throws SQLException { // LogInfo logInfo; PreparedStatement stmt = null; try { stmt = c.prepareStatement(MARK_MESSAGE); if (txId == null) { stmt.setObject(1, null); } else { stmt.setLong(1, txId.longValue()); } stmt.setString(2, mark); stmt.setLong(3, messageid); stmt.setString(4, destination); stmt.executeUpdate(); } finally { try { stmt.close(); } catch (Throwable ignore) { } } } ///////////////////////////////////////////////////////////////////////////////// // // Removing a message // ///////////////////////////////////////////////////////////////////////////////// public void remove(MessageReference messageRef, org.jboss.mq.pm.Tx txId) throws javax.jms.JMSException { // LogInfo logInfo; TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; try { c = datasource.getConnection(); if (txId == null) { stmt = c.prepareStatement(DELETE_MESSAGE); stmt.setLong(1, messageRef.messageId); stmt.setString(2, (String) messageRef.persistData); stmt.executeUpdate(); } else { stmt = c.prepareStatement(MARK_MESSAGE); stmt.setLong(1, txId.longValue()); stmt.setString(2, "D"); stmt.setLong(3, messageRef.messageId); stmt.setString(4, (String) messageRef.persistData); stmt.executeUpdate(); } } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not remove message: " + messageRef, e); } finally { try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } ///////////////////////////////////////////////////////////////////////////////// // // Misc. PM functions // ///////////////////////////////////////////////////////////////////////////////// public org.jboss.mq.pm.TxManager getTxManager() { return txManager; } /* * @see PersistenceManager#closeQueue(JMSDestination, SpyDestination) */ public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException { // Nothing to clean up, all the state is in the db. } public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException { // Should we delete all the records in the destination??? } /* * @see CacheStore#loadFromStorage(MessageReference) */ public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException { TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; ResultSet rs = null; try { c = datasource.getConnection(); stmt = c.prepareStatement(SELECT_MESSAGE); stmt.setLong(1, messageRef.messageId); stmt.setString(2, (String) messageRef.persistData); rs = stmt.executeQuery(); if (rs.next()) return extractMessage(rs); return null; } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not load message : " + messageRef, e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not load message : " + messageRef, e); } finally { try { rs.close(); } catch (Throwable ignore) { } try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } ///////////////////////////////////////////////////////////////////////////////// // // CacheStore Functions // ///////////////////////////////////////////////////////////////////////////////// public void removeFromStorage(MessageReference messageRef) throws JMSException { // LogInfo logInfo; TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; try { c = datasource.getConnection(); stmt = c.prepareStatement(DELETE_MESSAGE); stmt.setLong(1, messageRef.messageId); stmt.setString(2, (String) messageRef.persistData); stmt.executeUpdate(); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not remove message: " + messageRef.messageId, e); } finally { try { stmt.close(); } catch (Throwable ignore) { } try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } /* * @see CacheStore#saveToStorage(MessageReference, SpyMessage) */ public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException { // LogInfo logInfo; TransactionManagerStrategy tms = new TransactionManagerStrategy(); tms.startTX(); Connection c = null; PreparedStatement stmt = null; try { c = datasource.getConnection(); add(c, message, null, "T"); messageRef.setStored(((SpyDestination) message.getJMSDestination()).getName()); } catch (IOException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not store message: " + messageRef.messageId, e); } catch (SQLException e) { tms.setRollbackOnly(); throw new SpyJMSException("Could not store message: " + messageRef.messageId, e); } finally { try { c.close(); } catch (Throwable ignore) { } tms.endTX(); } } ///////////////////////////////////////////////////////////////////////////////// // // JMX Interface // ///////////////////////////////////////////////////////////////////////////////// private ObjectName messageCacheName; private ObjectName dataSourceName; private Properties sqlProperties = new Properties(); public void startService() throws Exception { log.debug("Starting"); SELECT_ALL_UNCOMMITED_TXS = sqlProperties.getProperty("SELECT_ALL_UNCOMMITED_TXS", SELECT_ALL_UNCOMMITED_TXS); DELETE_ALL_MESSAGE_WITH_TX = sqlProperties.getProperty("DELETE_ALL_MESSAGE_WITH_TX", DELETE_ALL_MESSAGE_WITH_TX); DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX); DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES); INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX); SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX); SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST); SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE); INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE); MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE); DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE); CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE); CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE); createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equals("true"); String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB"); if (s.equals("OBJECT_BLOB")) { blobType = OBJECT_BLOB; } else if (s.equals("BYTES_BLOB")) { blobType = BYTES_BLOB; } else if (s.equals("BINARYSTREAM_BLOB")) { blobType = BINARYSTREAM_BLOB; } //Find the ConnectionFactoryLoader MBean so we can find the datasource String dsName = (String) getServer().getAttribute(dataSourceName, "JndiName"); //Get an InitialContext InitialContext ctx = new InitialContext(); datasource = (DataSource) ctx.lookup("java:/" + dsName); //Get the Transaction Manager so we can control the jdbc tx tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME); messageCache = (MessageCache) getServer().getAttribute(messageCacheName, "Instance"); log.debug("Resolving uncommited TXS"); resolveAllUncommitedTXs(); log.debug("Started"); } public Object getInstance() { return this; } public ObjectName getMessageCache() { return messageCacheName; } public void setMessageCache(ObjectName messageCache) { this.messageCacheName = messageCache; } public ObjectName getDataSource() { return dataSourceName; } public void setDataSource(ObjectName dataSourceName) { this.dataSourceName = dataSourceName; } public String getName() { return "JBossMQ-jdbc-PersistenceManager"; } public MessageCache getMessageCacheInstance() { return messageCache; } /** * Gets the sqlProperties. * @return Returns a Properties */ public String getSqlProperties() { try { ByteArrayOutputStream boa = new ByteArrayOutputStream(); sqlProperties.store(boa, ""); return new String(boa.toByteArray()); } catch (IOException shouldnothappen) { return ""; } } /** * Sets the sqlProperties. * @param sqlProperties The sqlProperties to set */ public void setSqlProperties(String value) { try { ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes()); sqlProperties = new Properties(); sqlProperties.load(is); } catch (IOException shouldnothappen) { } } } 1.1 jbossmq/src/main/org/jboss/mq/pm/jdbc2/PersistenceManagerMBean.java Index: PersistenceManagerMBean.java =================================================================== /* * JBoss, the OpenSource J2EE webOS * * Distributable under LGPL license. * See terms of license at gnu.org. */ package org.jboss.mq.pm.jdbc2; import javax.management.ObjectName; import java.util.Properties; import org.jboss.system.ServiceMBean; /** * MBean interface for the JBossMQ JMX service. * * @author Vincent Sheffer ([EMAIL PROTECTED]) * @version $Revision: 1.1 $ */ public interface PersistenceManagerMBean extends ServiceMBean, org.jboss.mq.pm.PersistenceManagerMBean, org.jboss.mq.pm.CacheStoreMBean { ObjectName getDataSource(); void setDataSource(ObjectName dataSource); public String getSqlProperties(); public void setSqlProperties(String sqlProperties); }
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development