User: chirino
Date: 01/08/30 21:39:08
Modified: src/main/org/jboss/mq/pm/jdbc PersistenceManager.java
MessageLog.java
Log:
This PM was trying to get JNDI to a DataSource too soon. Moved to when the PM is
started.
Still need to test.
Revision Changes Path
1.4 +269 -301 jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java
Index: PersistenceManager.java
===================================================================
RCS file:
/cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/PersistenceManager.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- PersistenceManager.java 2001/08/30 02:35:55 1.3
+++ PersistenceManager.java 2001/08/31 04:39:08 1.4
@@ -5,341 +5,309 @@
* See terms of license at gnu.org.
*/
package org.jboss.mq.pm.jdbc;
-import java.io.*;
+
+import javax.rmi.PortableRemoteObject;
+import javax.jms.JMSException;
+import javax.sql.*;
+import javax.naming.*;
+import javax.management.*;
+import javax.naming.InitialContext;
import java.net.URL;
-import java.sql.*;
import java.util.HashMap;
+import java.util.TreeSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Properties;
-import java.util.TreeSet;
-import javax.jms.JMSException;
-
-import javax.management.*;
-import javax.naming.*;
-
-import javax.naming.InitialContext;
-
-import javax.rmi.PortableRemoteObject;
-import javax.sql.*;
-import org.jboss.mq.ConnectionToken;
+import java.sql.*;
+import java.io.*;
+import org.jboss.util.ServiceMBeanSupport;
import org.jboss.mq.SpyDestination;
-import org.jboss.mq.SpyJMSException;
-import org.jboss.mq.SpyMessage;
-import org.jboss.mq.pm.TxManager;
+import org.jboss.mq.xml.XElement;
+import org.jboss.mq.ConnectionToken;
import org.jboss.mq.server.JMSDestination;
import org.jboss.mq.server.JMSServer;
-import org.jboss.mq.xml.XElement;
-
-import org.jboss.system.ServiceMBeanSupport;
+import org.jboss.mq.pm.TxManager;
+import org.jboss.mq.SpyMessage;
+import org.jboss.mq.SpyJMSException;
/**
* This class manages all persistence related services for file based
* persistence.
*
- * @created August 16, 2001
- * @author: Jayesh Parayali ([EMAIL PROTECTED])
- * @version $Revision: 1.3 $
+ * @author: Jayesh Parayali ([EMAIL PROTECTED])
+ *
+ * @version $Revision: 1.4 $
*/
-public class PersistenceManager extends org.jboss.system.ServiceMBeanSupport
implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager {
+public class PersistenceManager extends org.jboss.util.ServiceMBeanSupport
implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager {
+
+ protected static DataSource datasource;
// Log file used to store commited transactions.
- TxLog txLog;
+ TxLog txLog;
// Maps SpyDestinations to SpyMessageLogs
- HashMap messageLogs = new HashMap();
+ HashMap messageLogs= new HashMap();
// Maps (Long)txIds to LinkedList of AddFile tasks
- HashMap transactedTasks = new HashMap();
- TxManager txManager;
+ HashMap transactedTasks= new HashMap();
- private String jmsDBPoolName;
-
- protected static DataSource datasource;
-
+ static class LogInfo {
+ MessageLog log;
+ SpyDestination destination;
- /**
- * Insert the method's description here. Creation date: (6/27/2001 1:07:07
- * AM)
- *
- * @param newJmsDBPoolName java.lang.String
- */
- public void setJmsDBPoolName( java.lang.String newJmsDBPoolName ) {
- jmsDBPoolName = newJmsDBPoolName;
+ LogInfo(MessageLog log, SpyDestination destination) {
+ this.log= log;
+ this.destination= destination;
+ }
}
- /**
- * Insert the method's description here. Creation date: (6/27/2001 1:07:07
- * AM)
- *
- * @return java.lang.String
- */
- public java.lang.String getJmsDBPoolName() {
- return jmsDBPoolName;
+ class Transaction {
+ private LogInfo logInfo;
+ private SpyMessage message;
+ private org.jboss.mq.pm.Tx txId;
+ private boolean add;
+ public Transaction(boolean add, LogInfo logInfo, SpyMessage message,
org.jboss.mq.pm.Tx txId) {
+ this.add= add;
+ this.logInfo= logInfo;
+ this.message= message;
+ this.txId= txId;
+ }
+ public void commit() throws JMSException {
+ if (!add)
+ logInfo.log.remove(message, txId);
+ }
+ public void rollback() throws JMSException {
+ if (add)
+ logInfo.log.remove(message, txId);
+ }
}
- public String getName() {
- return "JBossMQ-PersistenceManager";
- }
+ private String jmsDBPoolName;
+ TxManager txManager;
/**
- * getTxManager method comment.
- *
- * @return The TxManager value
- */
- public org.jboss.mq.pm.TxManager getTxManager() {
- return txManager;
- }
-
-
- public void initService()
- throws Exception {
-
- //Get an InitialContext
- InitialContext ctx = new InitialContext();
- datasource = ( DataSource )ctx.lookup( jmsDBPoolName );
- txLog = new TxLog( datasource );
-
- JMSServer server = ( JMSServer )getServer().invoke( new ObjectName(
org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME ), "getJMSServer", new Object[]{},
new String[]{} );
- server.setPersistenceManager( this );
-
- }
-
- public void startService()
- throws Exception {
-
- JMSServer server = ( JMSServer )getServer().invoke( new ObjectName(
org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME ), "getJMSServer", new Object[]{},
new String[]{} );
- restore( server );
-
- }
-
-
- public void destroyQueue( SpyDestination dest )
- throws javax.jms.JMSException {
-
- try {
-
- //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
- //java.io.File file = new java.io.File(logDir.getFile());
-
- LogInfo logInfo;
- synchronized ( messageLogs ) {
- logInfo = ( LogInfo )messageLogs.remove( "" + dest );
- }
- if ( logInfo == null ) {
- throw new SpyJMSException( "The persistence log was never initialized"
);
- }
-
- logInfo.log.close();
- //file.delete();
-
- } catch ( javax.jms.JMSException e ) {
- throw e;
- } catch ( Exception e ) {
- javax.jms.JMSException newE = new javax.jms.JMSException( "Invalid
configuration." );
- newE.setLinkedException( e );
- throw newE;
- }
-
- }
-
- public void initQueue( SpyDestination dest )
- throws javax.jms.JMSException {
- try {
- //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
-
- MessageLog log = new MessageLog( datasource, dest.toString() );
-
- LogInfo info = new LogInfo( log, dest );
-
- synchronized ( messageLogs ) {
- messageLogs.put( "" + dest, info );
- }
-
- } catch ( javax.jms.JMSException e ) {
- throw e;
- } catch ( Exception e ) {
- javax.jms.JMSException newE = new javax.jms.JMSException( "Invalid
configuration." );
- newE.setLinkedException( e );
- throw newE;
- }
-
- }
-
-
- public void add( org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId )
- throws javax.jms.JMSException {
- LogInfo logInfo;
-
- synchronized ( messageLogs ) {
- logInfo = ( LogInfo )messageLogs.get( "" + message.getJMSDestination() );
- }
-
- if ( logInfo == null ) {
- throw new javax.jms.JMSException( "Destination was not initalized with the
PersistenceManager" );
- }
-
- logInfo.log.add( message, txId );
-
- if ( txId != null ) {
- LinkedList tasks;
- synchronized ( transactedTasks ) {
- tasks = ( LinkedList )transactedTasks.get( txId );
- }
- if ( tasks == null ) {
- throw new javax.jms.JMSException( "Transaction is not active 5." );
- }
- synchronized ( tasks ) {
- tasks.addLast( new Transaction( true, logInfo, message, txId ) );
- }
- }
- }
-
- public void commitPersistentTx( org.jboss.mq.pm.Tx txId )
- throws javax.jms.JMSException {
-
- LinkedList transacted;
- synchronized ( transactedTasks ) {
- transacted = ( LinkedList )transactedTasks.remove( txId );
- }
- synchronized ( transacted ) {
- Iterator iter = transacted.iterator();
- while ( iter.hasNext() ) {
- Transaction task = ( Transaction )iter.next();
- task.commit();
- }
- }
-
- txLog.commitTx( txId );
- }
-
- public org.jboss.mq.pm.Tx createPersistentTx()
- throws javax.jms.JMSException {
- org.jboss.mq.pm.Tx txId = txLog.createTx();
- synchronized ( transactedTasks ) {
- transactedTasks.put( txId, new LinkedList() );
- }
- return txId;
- }
-
- public void remove( org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId )
- throws javax.jms.JMSException {
- LogInfo logInfo;
-
- synchronized ( messageLogs ) {
- logInfo = ( LogInfo )messageLogs.get( "" + message.getJMSDestination() );
- }
-
- if ( logInfo == null ) {
- throw new javax.jms.JMSException( "Destination was not initalized with the
PersistenceManager" );
- }
-
- if ( txId == null ) {
- logInfo.log.remove( message, txId );
- } else {
- LinkedList tasks;
- synchronized ( transactedTasks ) {
- tasks = ( LinkedList )transactedTasks.get( txId );
- }
- if ( tasks == null ) {
- throw new javax.jms.JMSException( "Transaction is not active 6." );
- }
- synchronized ( tasks ) {
- tasks.addLast( new Transaction( false, logInfo, message, txId ) );
- }
- }
-
- }
-
- public void restore( org.jboss.mq.server.JMSServer server )
- throws javax.jms.JMSException {
-
- TreeSet committingTXs = txLog.restore();
- HashMap clone;
- synchronized ( messageLogs ) {
- clone = ( HashMap )messageLogs.clone();
- }
-
- Iterator iter = clone.values().iterator();
- while ( iter.hasNext() ) {
-
- LogInfo logInfo = ( LogInfo )iter.next();
-
- JMSDestination q = server.getJMSDestination( logInfo.destination );
-
- SpyMessage rebuild[] = logInfo.log.restore( committingTXs, q.toString() );
-
- //TODO: make sure this lock is good enough
- synchronized ( q ) {
- for ( int i = 0; i < rebuild.length; i++ ) {
- q.restoreMessage( rebuild[i] );
- }
- }
- }
-
- }
-
- public void rollbackPersistentTx( org.jboss.mq.pm.Tx txId )
- throws javax.jms.JMSException {
+ * Insert the method's description here.
+ * Creation date: (6/27/2001 1:07:07 AM)
+ * @return java.lang.String
+ */
+ public java.lang.String getJmsDBPoolName() {
+ return jmsDBPoolName;
+ }
- LinkedList transacted;
- synchronized ( transactedTasks ) {
- transacted = ( LinkedList )transactedTasks.remove( txId );
- }
- synchronized ( transacted ) {
- Iterator iter = transacted.iterator();
- while ( iter.hasNext() ) {
- Transaction task = ( Transaction )iter.next();
- task.rollback();
- }
- }
+ public String getName() {
+ return "JBossMQ-PersistenceManager";
+ }
- txLog.rollbackTx( txId );
- }
+ public void initService() throws Exception {
+ JMSServer server= (JMSServer) getServer().invoke(new
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {
+ }, new String[] {
+ });
+ server.setPersistenceManager(this);
+ }
/**
- * @created August 16, 2001
- */
- class Transaction {
- private LogInfo logInfo;
- private SpyMessage message;
- private org.jboss.mq.pm.Tx txId;
- private boolean add;
-
- public Transaction( boolean add, LogInfo logInfo, SpyMessage message,
org.jboss.mq.pm.Tx txId ) {
- this.add = add;
- this.logInfo = logInfo;
- this.message = message;
- this.txId = txId;
- }
-
- public void commit()
- throws JMSException {
- if ( !add ) {
- logInfo.log.remove( message, txId );
- }
- }
-
- public void rollback()
- throws JMSException {
- if ( add ) {
- logInfo.log.remove( message, txId );
- }
- }
- }
+ * Insert the method's description here.
+ * Creation date: (6/27/2001 1:07:07 AM)
+ * @param newJmsDBPoolName java.lang.String
+ */
+ public void setJmsDBPoolName(java.lang.String newJmsDBPoolName) {
+ jmsDBPoolName= newJmsDBPoolName;
+ }
+
+ public void startService() throws Exception {
+
+ //Get an InitialContext
+ InitialContext ctx= new InitialContext();
+ datasource= (DataSource) ctx.lookup(jmsDBPoolName);
+ txLog= new TxLog(datasource);
+
+ Iterator i= messageLogs.values().iterator();
+ while (i.hasNext()) {
+ LogInfo li= (LogInfo) i.next();
+ li.log.setDatasource(datasource);
+ }
+
+ JMSServer server= (JMSServer) getServer().invoke(new
ObjectName(org.jboss.mq.server.JBossMQServiceMBean.OBJECT_NAME), "getJMSServer", new
Object[] {
+ }, new String[] {
+ });
+ restore(server);
+
+ }
+
+ public void destroyQueue(SpyDestination dest) throws javax.jms.JMSException {
+
+ try {
+
+ //URL logDir = new URL(dataDirectory, dest.toString()+"-"+queueId);
+ //java.io.File file = new java.io.File(logDir.getFile());
+
+ LogInfo logInfo;
+ synchronized (messageLogs) {
+ logInfo= (LogInfo) messageLogs.remove("" + dest);
+ }
+ if (logInfo == null)
+ throw new SpyJMSException("The persistence log was never
initialized");
+
+ logInfo.log.close();
+ //file.delete();
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE= new javax.jms.JMSException("Invalid
configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ }
+
+ public void initQueue(SpyDestination dest) throws javax.jms.JMSException {
+ try {
+
+ MessageLog log= new MessageLog(dest.toString());
+ LogInfo info= new LogInfo(log, dest);
+
+ synchronized (messageLogs) {
+ messageLogs.put("" + dest, info);
+ }
+
+ } catch (javax.jms.JMSException e) {
+ throw e;
+ } catch (Exception e) {
+ javax.jms.JMSException newE= new javax.jms.JMSException("Invalid
configuration.");
+ newE.setLinkedException(e);
+ throw newE;
+ }
+
+ }
+
+ public void add(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException {
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination());
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not initalized with
the PersistenceManager");
+
+ logInfo.log.add(message, txId);
+
+ if (txId != null) {
+ LinkedList tasks;
+ synchronized (transactedTasks) {
+ tasks= (LinkedList) transactedTasks.get(txId);
+ }
+ if (tasks == null)
+ throw new javax.jms.JMSException("Transaction is not active
5.");
+ synchronized (tasks) {
+ tasks.addLast(new Transaction(true, logInfo, message, txId));
+ }
+ }
+
+ }
+
+ public void commitPersistentTx(org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException {
+
+ LinkedList transacted;
+ synchronized (transactedTasks) {
+ transacted= (LinkedList) transactedTasks.remove(txId);
+ }
+ synchronized (transacted) {
+ Iterator iter= transacted.iterator();
+ while (iter.hasNext()) {
+ Transaction task= (Transaction) iter.next();
+ task.commit();
+ }
+ }
+
+ txLog.commitTx(txId);
+ }
+
+ public org.jboss.mq.pm.Tx createPersistentTx() throws javax.jms.JMSException {
+ org.jboss.mq.pm.Tx txId= txLog.createTx();
+ synchronized (transactedTasks) {
+ transactedTasks.put(txId, new LinkedList());
+ }
+ return txId;
+ }
/**
- * @created August 16, 2001
- */
- static class LogInfo {
- MessageLog log;
- SpyDestination destination;
+ * getTxManager method comment.
+ */
+ public org.jboss.mq.pm.TxManager getTxManager() {
+ return txManager;
+ }
- LogInfo( MessageLog log, SpyDestination destination ) {
- this.log = log;
- this.destination = destination;
- }
- }
-}
+ public void remove(org.jboss.mq.SpyMessage message, org.jboss.mq.pm.Tx txId)
throws javax.jms.JMSException {
+ LogInfo logInfo;
+
+ synchronized (messageLogs) {
+ logInfo= (LogInfo) messageLogs.get("" + message.getJMSDestination());
+ }
+
+ if (logInfo == null)
+ throw new javax.jms.JMSException("Destination was not initalized with
the PersistenceManager");
+
+ if (txId == null)
+ logInfo.log.remove(message, txId);
+ else {
+ LinkedList tasks;
+ synchronized (transactedTasks) {
+ tasks= (LinkedList) transactedTasks.get(txId);
+ }
+ if (tasks == null)
+ throw new javax.jms.JMSException("Transaction is not active
6.");
+ synchronized (tasks) {
+ tasks.addLast(new Transaction(false, logInfo, message, txId));
+ }
+ }
+
+ }
+
+ public void restore(org.jboss.mq.server.JMSServer server) throws
javax.jms.JMSException {
+
+ TreeSet committingTXs= txLog.restore();
+ HashMap clone;
+ synchronized (messageLogs) {
+ clone= (HashMap) messageLogs.clone();
+ }
+
+ Iterator iter= clone.values().iterator();
+ while (iter.hasNext()) {
+
+ LogInfo logInfo= (LogInfo) iter.next();
+
+ JMSDestination q= server.getJMSDestination(logInfo.destination);
+
+ SpyMessage rebuild[]= logInfo.log.restore(committingTXs,
q.toString());
+
+ //TODO: make sure this lock is good enough
+ synchronized (q) {
+ for (int i= 0; i < rebuild.length; i++) {
+ q.restoreMessage(rebuild[i]);
+ }
+ }
+ }
+
+ }
+
+ public void rollbackPersistentTx(org.jboss.mq.pm.Tx txId) throws
javax.jms.JMSException {
+
+ LinkedList transacted;
+ synchronized (transactedTasks) {
+ transacted= (LinkedList) transactedTasks.remove(txId);
+ }
+ synchronized (transacted) {
+ Iterator iter= transacted.iterator();
+ while (iter.hasNext()) {
+ Transaction task= (Transaction) iter.next();
+ task.rollback();
+ }
+ }
+
+ txLog.rollbackTx(txId);
+ }
+}
\ No newline at end of file
1.3 +154 -176 jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java
Index: MessageLog.java
===================================================================
RCS file: /cvsroot/jboss/jbossmq/src/main/org/jboss/mq/pm/jdbc/MessageLog.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MessageLog.java 2001/08/17 03:04:05 1.2
+++ MessageLog.java 2001/08/31 04:39:08 1.3
@@ -5,31 +5,30 @@
* See terms of license at gnu.org.
*/
package org.jboss.mq.pm.jdbc;
-import java.io.*;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.io.Serializable;
+import java.io.FileOutputStream;
+import java.io.ObjectOutputStream;
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+import java.io.File;
import java.sql.*;
+import java.io.*;
import javax.jms.JMSException;
import javax.sql.*;
import org.jboss.mq.SpyDestination;
-import org.jboss.mq.SpyJMSException;
import org.jboss.mq.SpyMessage;
+import org.jboss.mq.SpyJMSException;
/**
- * This is used to keep SpyMessages on the disk and is used reconstruct the
- * queue in case of provider failure.
+ * This is used to keep SpyMessages on the disk and is used reconstruct the
+ * queue in case of provider failure.
*
- * @created August 16, 2001
- * @author: Jayesh Parayali ([EMAIL PROTECTED])
- * @version $Revision: 1.2 $
+ * @author: Jayesh Parayali ([EMAIL PROTECTED])
+ * @version $Revision: 1.3 $
*/
public class MessageLog {
@@ -37,174 +36,153 @@
// Attributes
/////////////////////////////////////////////////////////////////////
//private File queueName;
- protected static DataSource datasource;
+ protected DataSource datasource;
/////////////////////////////////////////////////////////////////////
- // Constructor
+ // Public Methods
/////////////////////////////////////////////////////////////////////
- public MessageLog( DataSource datasource, String dest )
- throws JMSException {
- if ( this.datasource == null ) {
- this.datasource = datasource;
- }
- }
+ public void close() throws JMSException {
+ }
+
+ public SpyMessage[] restore(java.util.TreeSet comittingTXs, String dest) throws
JMSException {
+ String destin= dest.substring(21, dest.length());
+ java.util.TreeMap messageIndex= new java.util.TreeMap();
+ PreparedStatement pstmt= null;
+ ResultSet rs= null;
+ Connection con= null;
+
+ try {
+ con= datasource.getConnection();
+ pstmt= con.prepareStatement("select messageblob, messageid from
jms_messages where destination = ?");
+ pstmt.setString(1, destin);
+ rs= pstmt.executeQuery();
+
+ while (rs.next()) {
+ byte[] st= (byte[]) rs.getObject(1);
+ ByteArrayInputStream baip= new ByteArrayInputStream(st);
+ ObjectInputStream ois= new ObjectInputStream(baip);
+ // re-create the object
+ SpyMessage message= (SpyMessage) ois.readObject();
+
+ //Long msgId = new
Long(Long.parseLong(rs.getString(2).trim(),16));
+ //restore the messageId which is not persistent.
+ message.messageId= Long.parseLong(rs.getString(2).trim(), 16);
+ Long msgId= new Long(message.messageId);
+ messageIndex.put(msgId, message);
+ }
+ } catch (SQLException e) {
+ throwJMSException("SQL error while rebuilding the tranaction log.",
e);
+ } catch (Exception e) {
+ throwJMSException("Could not rebuild the queue from the queue's
tranaction log.", e);
+ } finally {
+ try {
+ if (rs != null)
+ rs.close();
+ if (pstmt != null)
+ pstmt.close();
+ if (con != null)
+ con.close();
+ } catch (SQLException e) {
+ throwJMSException("SQL error while closing the database
connection", e);
+ }
+ }
+
+ SpyMessage rc[]= new SpyMessage[messageIndex.size()];
+ java.util.Iterator iter= messageIndex.values().iterator();
+ for (int i= 0; iter.hasNext(); i++)
+ rc[i]= (SpyMessage) iter.next();
+ return rc;
+ }
+
+ private void throwJMSException(String message, Exception e) throws JMSException {
+ JMSException newE= new SpyJMSException(message);
+ newE.setLinkedException(e);
+ throw newE;
+ }
/////////////////////////////////////////////////////////////////////
- // Public Methods
+ // Constructor
/////////////////////////////////////////////////////////////////////
- public void close()
- throws JMSException {
- }
-
-
- public SpyMessage[] restore( java.util.TreeSet comittingTXs, String dest )
- throws JMSException {
- String destin = dest.substring( 21, dest.length() );
-
- java.util.TreeMap messageIndex = new java.util.TreeMap();
- PreparedStatement pstmt = null;
- ResultSet rs = null;
- Connection con = null;
-
- try {
- con = datasource.getConnection();
- pstmt =
- con.prepareStatement
- ( "select messageblob, messageid from jms_messages where destination
= ?" );
- pstmt.setString( 1, destin );
- rs = pstmt.executeQuery();
-
- while ( rs.next() ) {
- byte[] st = ( byte[] )rs.getObject( 1 );
- ByteArrayInputStream baip =
- new ByteArrayInputStream( st );
- ObjectInputStream ois =
- new ObjectInputStream( baip );
- // re-create the object
- SpyMessage message = ( SpyMessage )ois.readObject();
-
- //Long msgId = new Long(Long.parseLong(rs.getString(2).trim(),16));
- //restore the messageId which is not persistent.
- message.messageId = Long.parseLong( rs.getString( 2 ).trim(), 16 );
- Long msgId = new Long( message.messageId );
- messageIndex.put( msgId, message );
- }
- } catch ( SQLException e ) {
- throwJMSException( "SQL error while rebuilding the tranaction log.", e );
- } catch ( Exception e ) {
- throwJMSException( "Could not rebuild the queue from the queue's
tranaction log.", e );
- } finally {
- try {
- if ( rs != null ) {
- rs.close();
- }
- if ( pstmt != null ) {
- pstmt.close();
- }
- if ( con != null ) {
- con.close();
- }
- } catch ( SQLException e ) {
- throwJMSException( "SQL error while closing the database connection", e
);
- }
- }
-
- SpyMessage rc[] = new SpyMessage[messageIndex.size()];
- java.util.Iterator iter = messageIndex.values().iterator();
- for ( int i = 0; iter.hasNext(); i++ ) {
- rc[i] = ( SpyMessage )iter.next();
- }
- return rc;
- }
-
- public void add( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
- throws JMSException {
- PreparedStatement pstmt = null;
- Connection con = null;
-
- try {
- con = datasource.getConnection();
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream oos = new ObjectOutputStream( baos );
- oos.writeObject( message );
- byte[] messageAsBytes = baos.toByteArray();
- pstmt =
- con.prepareStatement
- ( "insert into jms_messages (messageid, destination, messageblob)
VALUES(?,?,?)" );
- ByteArrayInputStream bais =
- new ByteArrayInputStream( messageAsBytes );
- pstmt.setString( 2, ( ( SpyDestination )message.getJMSDestination()
).getName() );
- pstmt.setBinaryStream( 3, bais, messageAsBytes.length );
- String hexString = null;
- if ( message.messageId <= 0 ) {
- hexString = "-" + Long.toHexString( ( -1 ) * message.messageId );
- } else {
- hexString = Long.toHexString( message.messageId );
- }
-
- pstmt.setString( 1, hexString );
- pstmt.executeUpdate();
-
- pstmt.close();
- } catch ( IOException e ) {
- throwJMSException( "Could serialize the message.", e );
- } catch ( SQLException e ) {
- throwJMSException( "Could not write message to the database.", e );
- } finally {
- try {
- //if (pstmt != null)
- //pstmt.close();
- if ( con != null ) {
- con.close();
- }
- } catch ( SQLException e ) {
- throwJMSException( "Could not close the database.", e );
- }
-
- }
- }
-
- public void remove( SpyMessage message, org.jboss.mq.pm.Tx transactionId )
- throws JMSException {
- PreparedStatement pstmt = null;
- Connection con = null;
- try {
- con = datasource.getConnection();
- pstmt =
- con.prepareStatement
- ( "delete from jms_messages where messageid = ? and destination = ?"
);
- String hexString = null;
- if ( message.messageId <= 0 ) {
- hexString = "-" + Long.toHexString( ( -1 ) * message.messageId );
- } else {
- hexString = Long.toHexString( message.messageId );
- }
- pstmt.setString( 1, hexString );
- pstmt.setString( 2, ( ( SpyDestination )message.getJMSDestination()
).getName().trim() );
-
- pstmt.execute();
- } catch ( SQLException e ) {
- throwJMSException( "Could not remove the message.", e );
- } finally {
- try {
- if ( pstmt != null ) {
- pstmt.close();
- }
- if ( con != null ) {
- con.close();
- }
- } catch ( SQLException e ) {
- throwJMSException( "Could not close the database.", e );
- }
-
- }
- }
-
- private void throwJMSException( String message, Exception e )
- throws JMSException {
- JMSException newE = new SpyJMSException( message );
- newE.setLinkedException( e );
- throw newE;
- }
-}
+ public MessageLog(String dest) throws JMSException {
+ }
+
+ public void add(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws
JMSException {
+ PreparedStatement pstmt= null;
+ Connection con= null;
+
+ try {
+ con= datasource.getConnection();
+ ByteArrayOutputStream baos= new ByteArrayOutputStream();
+ ObjectOutputStream oos= new ObjectOutputStream(baos);
+ oos.writeObject(message);
+ byte[] messageAsBytes= baos.toByteArray();
+ pstmt= con.prepareStatement("insert into jms_messages (messageid,
destination, messageblob) VALUES(?,?,?)");
+ ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
+ pstmt.setString(2, ((SpyDestination)
message.getJMSDestination()).getName());
+ pstmt.setBinaryStream(3, bais, messageAsBytes.length);
+ String hexString= null;
+ if (message.messageId <= 0)
+ hexString= "-" + Long.toHexString((-1) * message.messageId);
+ else
+ hexString= Long.toHexString(message.messageId);
+
+ pstmt.setString(1, hexString);
+ pstmt.executeUpdate();
+
+ pstmt.close();
+ } catch (IOException e) {
+ throwJMSException("Could serialize the message.", e);
+ } catch (SQLException e) {
+ throwJMSException("Could not write message to the database.", e);
+ } finally {
+ try {
+ //if (pstmt != null)
+ //pstmt.close();
+ if (con != null)
+ con.close();
+ } catch (SQLException e) {
+ throwJMSException("Could not close the database.", e);
+ }
+
+ }
+ }
+
+ public javax.sql.DataSource getDatasource() {
+ return datasource;
+ }
+
+ public void remove(SpyMessage message, org.jboss.mq.pm.Tx transactionId) throws
JMSException {
+ PreparedStatement pstmt= null;
+ Connection con= null;
+ try {
+ con= datasource.getConnection();
+ pstmt= con.prepareStatement("delete from jms_messages where messageid
= ? and destination = ?");
+ String hexString= null;
+ if (message.messageId <= 0)
+ hexString= "-" + Long.toHexString((-1) * message.messageId);
+ else
+ hexString= Long.toHexString(message.messageId);
+ pstmt.setString(1, hexString);
+ pstmt.setString(2, ((SpyDestination)
message.getJMSDestination()).getName().trim());
+
+ pstmt.execute();
+ } catch (SQLException e) {
+ throwJMSException("Could not remove the message.", e);
+ } finally {
+ try {
+ if (pstmt != null)
+ pstmt.close();
+ if (con != null)
+ con.close();
+ } catch (SQLException e) {
+ throwJMSException("Could not close the database.", e);
+ }
+
+ }
+ }
+
+ public void setDatasource(javax.sql.DataSource newDatasource) {
+ datasource= newDatasource;
+ }
+}
\ No newline at end of file
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development