User: hiram
Date: 01/01/08 13:57:08
Modified: src/main/org/jboss/jms/asf StdServerSessionPool.java
StdServerSession.java
Log:
Transactional support added for MDBs. Message receipt is now
part of the transaction for a CMT bean with Required transactions.
Hopefully this did not break anything else with MDBs.
Revision Changes Path
1.2 +25 -7 jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Index: StdServerSessionPool.java
===================================================================
RCS file:
/products/cvs/ejboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- StdServerSessionPool.java 2000/12/27 22:45:53 1.1
+++ StdServerSessionPool.java 2001/01/08 21:57:08 1.2
@@ -25,8 +25,13 @@
import javax.jms.ServerSessionPool;
import javax.jms.MessageListener;
import javax.jms.TopicConnection;
+import javax.jms.XATopicConnection;
import javax.jms.QueueConnection;
+import javax.jms.XAQueueConnection;
import javax.jms.Session;
+import javax.jms.XASession;
+import javax.jms.XAQueueSession;
+import javax.jms.XATopicSession;
import org.jboss.logging.Logger;
/**
@@ -50,6 +55,11 @@
private ThreadPool threadPool = new ThreadPool();
private Vector sessionPool = new Vector();
+ boolean isTransacted() {
+ return transacted;
+ }
+
+
/**
* Minimal constructor, could also have stuff for pool size
*/
@@ -125,15 +135,23 @@
try {
// Here is the meat, that MUST follow the spec
Session ses = null;
- if (con instanceof TopicConnection) {
- ses = ((TopicConnection)con).createTopicSession(transacted,
ack);
+ XASession xaSes = null;
+ if (con instanceof XATopicConnection) {
+ xaSes =
((XATopicConnection)con).createXATopicSession();
+ ses = ((XATopicSession)xaSes).getTopicSession();
+ } else if(con instanceof XAQueueConnection) {
+ xaSes =
((XAQueueConnection)con).createXAQueueSession();
+ ses = ((XAQueueSession)xaSes).getQueueSession();
+ } else if (con instanceof TopicConnection) {
+ ses =
((TopicConnection)con).createTopicSession(transacted, ack);
+ Logger.error("WARNING: Using a non-XA TopicConnection.
It will not be able to participate in a Global UOW");
} else if(con instanceof QueueConnection) {
- ses = ((QueueConnection)con).createQueueSession(transacted,
ack);
- Logger.debug("Creating a QueueSession" + ses);
+ ses =
((QueueConnection)con).createQueueSession(transacted, ack);
+ Logger.error("WARNING: Using a non-XA QueueConnection.
It will not be able to participate in a Global UOW");
} else {
- Logger.debug("Error in getting session for con" + con);
- throw new JMSException("Connection was not reconizable: " +
con);
+ Logger.debug("Error in getting session for con: " +
con);
+ throw new JMSException("Connection was not
reconizable: " + con);
}
// This might not be totala spec compliant since it
@@ -142,7 +160,7 @@
Logger.debug("Setting listener for session");
ses.setMessageListener(listener);
sessionPool.addElement(
- new StdServerSession(this, ses)
+ new StdServerSession(this, ses, xaSes)
);
}
catch (JMSException exception){
1.2 +98 -5 jboss/src/main/org/jboss/jms/asf/StdServerSession.java
Index: StdServerSession.java
===================================================================
RCS file:
/products/cvs/ejboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- StdServerSession.java 2000/12/27 22:45:53 1.1
+++ StdServerSession.java 2001/01/08 21:57:08 1.2
@@ -22,7 +22,14 @@
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.Session;
+import javax.jms.XASession;
+import javax.naming.InitialContext;
+import javax.transaction.Status;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+
import org.jboss.logging.Logger;
/**
* StdServerSession.java
@@ -37,11 +44,22 @@
public class StdServerSession implements Runnable, ServerSession {
private StdServerSessionPool serverSessionPool = null;
private Session session = null;
+ private XASession xaSession = null;
+ private TransactionManager tm;
+
- StdServerSession(StdServerSessionPool pool, Session session) throws
JMSException{
+ StdServerSession(StdServerSessionPool pool, Session session, XASession
xaSession) throws JMSException{
serverSessionPool = pool;
this.session = session;
+ this.xaSession = xaSession;
+
+ try {
+ tm = (TransactionManager)new
InitialContext().lookup("java:/TransactionManager");
+ } catch ( Exception e ) {
+ throw new JMSException("Transation Manager was not found");
+ }
+
}
// --- Impl of JMS standard API
@@ -79,24 +97,99 @@
* to the session to have been filled with messages and it will run
* against the listener set in StdServerSessionPool. When it has send
* all its messages it returns.
+ *
+ * HC: run() also starts a transaction with the TransactionManager and
+ * enlists the XAResource of the JMS XASession if a XASession was abvailable.
+ * A good JMS implementation should provide the XASession for use in the ASF.
+ * So we optimize for the case where we have an XASession. So, for the case
+ * where we do not have an XASession and the bean is not transacted, we
+ * have the unneeded overhead of creating a Transaction. I'm leaving it
+ * this way since it keeps the code simpler and that case should not be too
+ * common (spyderMQ provides XASessions).
+ *
*/
public void run() {
+
+ Transaction trans=null;
+
try {
+
Logger.debug("Invoking run on session");
+
+ Logger.debug("Starting the Message Driven Bean transaction");
+ tm.begin();
+ trans = tm.getTransaction();
+
+ if( xaSession != null ) {
+
+ XAResource res = xaSession.getXAResource();
+ trans.enlistResource(res);
+ Logger.debug("XAResource '"+res+"' enlisted.");
+
+ }
+
session.run();
+
}catch (Exception ex) {
- // Log error
- }finally {
+
+ Logger.exception( ex );
+
+ try {
+ // The transaction will be rolledback in the finally
+ trans.setRollbackOnly();
+ } catch( Exception e ) {
+ Logger.exception( e );
+ }
+
+ } finally {
+
+
+ try {
+
+ Logger.debug("Ending the Message Driven Bean transaction");
+
+ // Marked rollback
+ if ( trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
+
+ // actually roll it back
+ trans.rollback();
+
+ // NO XASession? then manually rollback. This is not
so good but
+ // it's the best we can do if we have no XASession.
+ if( xaSession==null &&
serverSessionPool.isTransacted() )
+ session.rollback();
+
+ } else if(trans.getStatus() == Status.STATUS_ACTIVE) {
+
+ // Commit tx
+ // This will happen if
+ // a) everything goes well
+ // b) app. exception was thrown
+
+ trans.commit();
+
+ // NO XASession? then manually commit. This is not so
good but
+ // it's the best we can do if we have no XASession.
+ if( xaSession==null &&
serverSessionPool.isTransacted() )
+ session.commit();
+
+ }
+
+ } catch(Exception e) {
+ // There was a problem doing the commit/rollback.
+ Logger.exception(e);
+ }
+
StdServerSession.this.recycle();
}
}
-
+
/**
* This method is called by the ServerSessionPool when it is ready to
* be recycled intot the pool
*/
void recycle()
{
- serverSessionPool.recycle(this);
+ serverSessionPool.recycle(this);
}
} // StdServerSession