User: chirino Date: 01/12/12 14:12:18 Modified: src/main/org/jboss/jms/asf StdServerSession.java StdServerSessionPool.java Log: MDB update. Now we support packing > 1 message in a session. Each message will correctly be commited/rolled back in it's own tx. (Before the group would be in a single tx.) The TM associating code has been moved into the onMessage() method. The JBossMQ specify localTx optimization interface has has been replace with a more generic XAResource interface to manage a local tx. Revision Changes Path 1.10 +76 -93 jboss/src/main/org/jboss/jms/asf/StdServerSession.java Index: StdServerSession.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- StdServerSession.java 2001/09/25 03:34:46 1.9 +++ StdServerSession.java 2001/12/12 22:12:18 1.10 @@ -5,23 +5,23 @@ * See terms of license at gnu.org. */ package org.jboss.jms.asf; -import java.lang.reflect.Method; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; import javax.jms.ServerSession; import javax.jms.Session; import javax.jms.XASession; - import javax.naming.InitialContext; - import javax.transaction.Status; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; -import org.apache.log4j.Category; - +import org.jboss.logging.Logger; import org.jboss.tm.TransactionManagerService; +import org.jboss.tm.XidImpl; /** * An implementation of ServerSession. <p> @@ -31,33 +31,30 @@ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> . * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a> * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> . - * @version $Revision: 1.9 $ + * @version $Revision: 1.10 $ */ public class StdServerSession - implements Runnable, ServerSession + implements Runnable, ServerSession, MessageListener { /** * Instance logger. */ - private final Category log = Category.getInstance(this.getClass()); + static Logger log = Logger.getLogger(StdServerSession.class); /** * The server session pool which we belong to. */ private StdServerSessionPool serverSessionPool; - // = null; /** * Our session resource. */ private Session session; - // = null; /** * Our XA session resource. */ private XASession xaSession; - // = null; /** * The transaction manager that we will use for transactions. @@ -72,11 +69,17 @@ private boolean useLocalTX; /** + * The listener to delegate calls, to. In our case the container invoker. + */ + private MessageListener delegateListener; + + /** * Create a <tt>StdServerSession</tt> . * * @param pool The server session pool which we belong to. * @param session Our session resource. * @param xaSession Our XA session resource. + * @param delegateListener Listener to call when messages arrives. * @param useLocalTX Will this session be used in a global TX (we can optimize with 1 phase commit) * @throws JMSException Transation manager was not found. * @exception JMSException Description of Exception @@ -84,7 +87,8 @@ StdServerSession(final StdServerSessionPool pool, final Session session, final XASession xaSession, - final boolean useLocalTX) + final MessageListener delegateListener, + boolean useLocalTX) throws JMSException { // assert pool != null @@ -93,19 +97,20 @@ this.serverSessionPool = pool; this.session = session; this.xaSession = xaSession; - - try - { - this.useLocalTX = useLocalTX && Class.forName("org.jboss.mq.SpySession").isAssignableFrom(session.getClass()); - } - catch (ClassNotFoundException e) - { - this.useLocalTX = false; - } + this.delegateListener = delegateListener; + if( xaSession == null ) + useLocalTX = false; + this.useLocalTX = useLocalTX; log.debug("initializing (pool, session, xaSession, useLocalTX): " + pool + ", " + session + ", " + xaSession + ", " + useLocalTX); + // Set out self as message listener + if (xaSession != null) + xaSession.setMessageListener(this); + else + session.setMessageListener(this); + InitialContext ctx = null; try { @@ -146,7 +151,10 @@ */ public Session getSession() throws JMSException { - return session; + if (xaSession != null) + return xaSession; + else + return session; } //--- Protected parts, used by other in the package @@ -155,7 +163,20 @@ * Runs in an own thread, basically calls the session.run(), it is up to the * session to have been filled with messages and it will run against the * listener set in StdServerSessionPool. When it has send all its messages it - * returns. HC: run() also starts a transaction with the TransactionManager + * returns. + */ + public void run() + { + log.debug("running..."); + if (xaSession != null) + xaSession.run(); + else + session.run(); + } + /** + * Will get called from session for each message stuffed into it. + * + * Starts a transaction with the TransactionManager * and enlists the XAResource of the JMS XASession if a XASession was * available. A good JMS implementation should provide the XASession for use * in the ASF. So we optimize for the case where we have an XASession. So, @@ -164,15 +185,14 @@ * leaving it this way since it keeps the code simpler and that case should * not be too common (JBossMQ provides XASessions). */ - public void run() - { - log.debug("running..."); + public void onMessage(Message msg) { log.info("running (pool, session, xaSession, useLocalTX): " + ", " + session + ", " + xaSession + ", " + useLocalTX); // Used if run with useLocalTX if true - JBossMQTXInterface jbossMQTXInterface = null; + Xid localXid = null; + boolean localRollbackFlag=false; // Used if run with useLocalTX if false Transaction trans = null; @@ -182,8 +202,11 @@ if (useLocalTX) { // Use JBossMQ One Phase Commit to commit the TX - jbossMQTXInterface = new JBossMQTXInterface(session); - jbossMQTXInterface.startTX(); + localXid = new XidImpl(); + XAResource res = xaSession.getXAResource(); + res.start(localXid, XAResource.TMNOFLAGS); + + if (log.isTraceEnabled()) log.trace("Using optimized 1p commit to control TX."); } else @@ -197,16 +220,15 @@ { XAResource res = xaSession.getXAResource(); trans.enlistResource(res); - if (log.isDebugEnabled()) - { - log.debug("XAResource '" + res + "' enlisted."); - } + if (log.isTraceEnabled()) log.trace("XAResource '" + res + "' enlisted."); } } //currentTransactionId = connection.spyXAResourceManager.startTx(); // run the session - session.run(); + //session.run(); + // Call delegate listener + delegateListener.onMessage(msg); } catch (Exception e) { @@ -215,7 +237,7 @@ if (useLocalTX) { // Use JBossMQ One Phase Commit to commit the TX - jbossMQTXInterface.setRollbackOnly(); + localRollbackFlag = true; } else { @@ -224,6 +246,7 @@ try { // The transaction will be rolledback in the finally + if (log.isTraceEnabled()) log.trace("Using TM to mark TX for rollback."); trans.setRollbackOnly(); } catch (Exception x) @@ -239,8 +262,20 @@ { if (useLocalTX) { - // Use JBossMQ One Phase Commit to commit the TX - jbossMQTXInterface.endTX(); + if( localRollbackFlag == true ) { + if (log.isTraceEnabled()) log.trace("Using optimized 1p commit to rollback TX."); + + XAResource res = xaSession.getXAResource(); + res.end(localXid, XAResource.TMSUCCESS); + res.rollback(localXid); + + } else { + if (log.isTraceEnabled()) log.trace("Using optimized 1p commit to commit TX."); + + XAResource res = xaSession.getXAResource(); + res.end(localXid, XAResource.TMSUCCESS); + res.commit(localXid, true); + } } else @@ -250,7 +285,7 @@ // Marked rollback if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) { - log.info("Rolling back JMS transaction"); + log.debug("Rolling back JMS transaction"); // actually roll it back trans.rollback(); @@ -268,6 +303,7 @@ // This will happen if // a) everything goes well // b) app. exception was thrown + if (log.isTraceEnabled()) log.trace("Commiting the JMS transaction"); trans.commit(); // NO XASession? then manually commit. This is not so good but @@ -287,8 +323,7 @@ StdServerSession.this.recycle(); } - - log.debug("done"); + if (log.isTraceEnabled()) log.trace("done"); } /** @@ -358,59 +393,7 @@ serverSessionPool.recycle(this); } - - /** - * #Description of the Class - */ - private static class JBossMQTXInterface - { - - static boolean initialzied = false; - static Method getXAResourceManager; - static Method startTx; - static Method endTx; - static Method commit; - static Method rollback; - boolean doRollback = false; - Object xid = null; - Object spyXAResourceManager = null; - - JBossMQTXInterface(Session sess) throws Exception - { - if (!initialzied) - { - getXAResourceManager = Class.forName("org.jboss.mq.SpySession").getMethod("getXAResourceManager", new Class[]{}); - startTx = Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("startTx", new Class[]{}); - endTx = Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("endTx", new Class[]{Object.class, boolean.class}); - commit = Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("commit", new Class[]{Object.class, boolean.class}); - rollback = Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("rollback", new Class[]{Object.class}); - initialzied = true; - } - spyXAResourceManager = getXAResourceManager.invoke(sess, new Object[]{}); - } - void setRollbackOnly() - { - doRollback = true; - } +} - void startTX() throws Exception - { - xid = startTx.invoke(spyXAResourceManager, new Object[]{}); - } - void endTX() throws Exception - { - if (doRollback) - { - endTx.invoke(spyXAResourceManager, new Object[]{xid, new Boolean(true)}); - rollback.invoke(spyXAResourceManager, new Object[]{xid}); - } - else - { - endTx.invoke(spyXAResourceManager, new Object[]{xid, new Boolean(true)}); - commit.invoke(spyXAResourceManager, new Object[]{xid, new Boolean(true)}); - } - } - } -} 1.15 +21 -10 jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java Index: StdServerSessionPool.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v retrieving revision 1.14 retrieving revision 1.15 diff -u -r1.14 -r1.15 --- StdServerSessionPool.java 2001/09/25 03:34:46 1.14 +++ StdServerSessionPool.java 2001/12/12 22:12:18 1.15 @@ -37,7 +37,7 @@ * * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> . * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> . - * @version $Revision: 1.14 $ + * @version $Revision: 1.15 $ */ public class StdServerSessionPool implements ServerSessionPool @@ -112,11 +112,11 @@ /** * Construct a <tt>StdServerSessionPool</tt> using the default pool size. * - * @param con - * @param transacted - * @param ack - * @param listener - * @param maxSession + * @param con connection to get sessions from + * @param transacted transaction mode when not XA ( + * @param ack ackmode when not XA + * @param listener the listener the sessions will call + * @param maxSession maximum number of sessions in the pool * @param isuseLocalTX Description of Parameter * @exception JMSException Description of Exception */ @@ -360,16 +360,27 @@ throw new JMSException("Connection was not reconizable: " + con); } + + + // create the server session and add it to the pool - it is up to the + // server session to set the listener + StdServerSession serverSession = new StdServerSession(this, ses, xaSes, listener, useLocalTX); + + // This might not be totaly spec compliant since it says that app // server should create as many message listeners its needs. - log.debug("setting session listener: " + listener); - ses.setMessageListener(listener); + //log.debug("setting session listener: " + listener); + //if(xaSession + //ses.setMessageListener(serverSession); + //FIXME, it seems as if Sonic is using ites XaSession to do all work + //if (xaSes != null) + // xaSes.setMessageListener(serverSession); - // create the server session and add it to the pool - ServerSession serverSession = new StdServerSession(this, ses, xaSes, useLocalTX); sessionPool.add(serverSession); numServerSessions++; log.debug("added server session to the pool: " + serverSession); } } } + +
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development