User: chirino 
  Date: 01/12/12 14:12:18

  Modified:    src/main/org/jboss/jms/asf StdServerSession.java
                        StdServerSessionPool.java
  Log:
  MDB update.  Now we support packing > 1 message in a session.  Each
  message will correctly be commited/rolled back in it's own tx.  (Before the group
  would be in a single tx.)  The TM associating code has been moved into the 
onMessage()
  method. The JBossMQ specify localTx optimization interface has has been replace
  with a more generic XAResource interface to manage a local tx.
  
  Revision  Changes    Path
  1.10      +76 -93    jboss/src/main/org/jboss/jms/asf/StdServerSession.java
  
  Index: StdServerSession.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- StdServerSession.java     2001/09/25 03:34:46     1.9
  +++ StdServerSession.java     2001/12/12 22:12:18     1.10
  @@ -5,23 +5,23 @@
    * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
  -import java.lang.reflect.Method;
   
   import javax.jms.JMSException;
  +import javax.jms.Message;
  +import javax.jms.MessageListener;
   import javax.jms.ServerSession;
   import javax.jms.Session;
   import javax.jms.XASession;
  -
   import javax.naming.InitialContext;
  -
   import javax.transaction.Status;
   import javax.transaction.Transaction;
   import javax.transaction.TransactionManager;
   import javax.transaction.xa.XAResource;
  +import javax.transaction.xa.Xid;
   
  -import org.apache.log4j.Category;
  -
  +import org.jboss.logging.Logger;
   import org.jboss.tm.TransactionManagerService;
  +import org.jboss.tm.XidImpl;
   
   /**
    * An implementation of ServerSession. <p>
  @@ -31,33 +31,30 @@
    * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
    * @author    <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
    * @author    <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a> .
  - * @version   $Revision: 1.9 $
  + * @version   $Revision: 1.10 $
    */
   public class StdServerSession
  -       implements Runnable, ServerSession
  +   implements Runnable, ServerSession, MessageListener
   {
      /**
       * Instance logger.
       */
  -   private final Category log = Category.getInstance(this.getClass());
  +   static Logger log = Logger.getLogger(StdServerSession.class);
   
      /**
       * The server session pool which we belong to.
       */
      private StdServerSessionPool serverSessionPool;
  -   // = null;
   
      /**
       * Our session resource.
       */
      private Session session;
  -   // = null;
   
      /**
       * Our XA session resource.
       */
      private XASession xaSession;
  -   // = null;
   
      /**
       * The transaction manager that we will use for transactions.
  @@ -72,11 +69,17 @@
      private boolean useLocalTX;
   
      /**
  +    * The listener to delegate calls, to. In our case the container invoker.
  +    */
  +   private MessageListener delegateListener;
  +
  +   /**
       * Create a <tt>StdServerSession</tt> .
       *
       * @param pool              The server session pool which we belong to.
       * @param session           Our session resource.
       * @param xaSession         Our XA session resource.
  +    * @param delegateListener  Listener to call when messages arrives.
       * @param useLocalTX       Will this session be used in a global TX (we can 
optimize with 1 phase commit)
       * @throws JMSException     Transation manager was not found.
       * @exception JMSException  Description of Exception
  @@ -84,7 +87,8 @@
      StdServerSession(final StdServerSessionPool pool,
            final Session session,
            final XASession xaSession,
  -         final boolean useLocalTX)
  +      final MessageListener delegateListener,
  +         boolean useLocalTX)
             throws JMSException
      {
         // assert pool != null
  @@ -93,19 +97,20 @@
         this.serverSessionPool = pool;
         this.session = session;
         this.xaSession = xaSession;
  -
  -      try
  -      {
  -         this.useLocalTX = useLocalTX && 
Class.forName("org.jboss.mq.SpySession").isAssignableFrom(session.getClass());
  -      }
  -      catch (ClassNotFoundException e)
  -      {
  -         this.useLocalTX = false;
  -      }
  +      this.delegateListener = delegateListener;
  +      if( xaSession == null )
  +           useLocalTX = false;
  +      this.useLocalTX = useLocalTX;
   
         log.debug("initializing (pool, session, xaSession, useLocalTX): " +
               pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
   
  +      // Set out self as message listener
  +      if (xaSession != null)
  +              xaSession.setMessageListener(this);
  +      else
  +       session.setMessageListener(this);
  +
         InitialContext ctx = null;
         try
         {
  @@ -146,7 +151,10 @@
       */
      public Session getSession() throws JMSException
      {
  -      return session;
  +      if (xaSession != null)
  +              return xaSession;
  +      else
  +      return session;
      }
   
      //--- Protected parts, used by other in the package
  @@ -155,7 +163,20 @@
       * Runs in an own thread, basically calls the session.run(), it is up to the
       * session to have been filled with messages and it will run against the
       * listener set in StdServerSessionPool. When it has send all its messages it
  -    * returns. HC: run() also starts a transaction with the TransactionManager
  +    * returns.
  +    */
  +   public void run()
  +   {
  +      log.debug("running...");
  +      if (xaSession != null)
  +              xaSession.run();
  +      else
  +      session.run();
  +   }
  +   /**
  +    * Will get called from session for each message stuffed into it.
  +    *
  +    * Starts a transaction with the TransactionManager
       * and enlists the XAResource of the JMS XASession if a XASession was
       * available. A good JMS implementation should provide the XASession for use
       * in the ASF. So we optimize for the case where we have an XASession. So,
  @@ -164,15 +185,14 @@
       * leaving it this way since it keeps the code simpler and that case should
       * not be too common (JBossMQ provides XASessions).
       */
  -   public void run()
  -   {
  -      log.debug("running...");
  +   public void onMessage(Message msg) {
   
         log.info("running (pool, session, xaSession, useLocalTX): " +
               ", " + session + ", " + xaSession + ", " + useLocalTX);
   
         // Used if run with useLocalTX if true
  -      JBossMQTXInterface jbossMQTXInterface = null;
  +      Xid localXid = null;
  +      boolean localRollbackFlag=false;
   
         // Used if run with useLocalTX if false
         Transaction trans = null;
  @@ -182,8 +202,11 @@
            if (useLocalTX)
            {
               // Use JBossMQ One Phase Commit to commit the TX
  -            jbossMQTXInterface = new JBossMQTXInterface(session);
  -            jbossMQTXInterface.startTX();
  +            localXid = new XidImpl();
  +            XAResource res = xaSession.getXAResource();
  +            res.start(localXid, XAResource.TMNOFLAGS);
  +            
  +            if (log.isTraceEnabled()) log.trace("Using optimized 1p commit to 
control TX.");
   
            }
            else
  @@ -197,16 +220,15 @@
               {
                  XAResource res = xaSession.getXAResource();
                  trans.enlistResource(res);
  -               if (log.isDebugEnabled())
  -               {
  -                  log.debug("XAResource '" + res + "' enlisted.");
  -               }
  +               if (log.isTraceEnabled()) log.trace("XAResource '" + res + "' 
enlisted.");
               }
            }
            //currentTransactionId = connection.spyXAResourceManager.startTx();
   
            // run the session
  -         session.run();
  +         //session.run();
  +      // Call delegate listener
  +      delegateListener.onMessage(msg);
         }
         catch (Exception e)
         {
  @@ -215,7 +237,7 @@
            if (useLocalTX)
            {
               // Use JBossMQ One Phase Commit to commit the TX
  -            jbossMQTXInterface.setRollbackOnly();
  +            localRollbackFlag = true;
            }
            else
            {
  @@ -224,6 +246,7 @@
               try
               {
                  // The transaction will be rolledback in the finally
  +               if (log.isTraceEnabled()) log.trace("Using TM to mark TX for 
rollback.");
                  trans.setRollbackOnly();
               }
               catch (Exception x)
  @@ -239,8 +262,20 @@
            {
               if (useLocalTX)
               {
  -               // Use JBossMQ One Phase Commit to commit the TX
  -               jbossMQTXInterface.endTX();
  +             if( localRollbackFlag == true  ) {
  +                         if (log.isTraceEnabled()) log.trace("Using optimized 1p 
commit to rollback TX.");
  +                         
  +                         XAResource res = xaSession.getXAResource();
  +                         res.end(localXid, XAResource.TMSUCCESS);
  +                         res.rollback(localXid);
  +                         
  +             } else {
  +                         if (log.isTraceEnabled()) log.trace("Using optimized 1p 
commit to commit TX.");
  +                         
  +                         XAResource res = xaSession.getXAResource();
  +                         res.end(localXid, XAResource.TMSUCCESS);
  +                         res.commit(localXid, true);
  +             }               
   
               }
               else
  @@ -250,7 +285,7 @@
                  // Marked rollback
                  if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
                  {
  -                  log.info("Rolling back JMS transaction");
  +                  log.debug("Rolling back JMS transaction");
                     // actually roll it back
                     trans.rollback();
   
  @@ -268,6 +303,7 @@
                     // This will happen if
                     // a) everything goes well
                     // b) app. exception was thrown
  +                  if (log.isTraceEnabled()) log.trace("Commiting the JMS 
transaction");
                     trans.commit();
   
                     // NO XASession? then manually commit.  This is not so good but
  @@ -287,8 +323,7 @@
   
            StdServerSession.this.recycle();
         }
  -
  -      log.debug("done");
  +      if (log.isTraceEnabled()) log.trace("done");
      }
   
      /**
  @@ -358,59 +393,7 @@
         serverSessionPool.recycle(this);
      }
   
  -
  -   /**
  -    * #Description of the Class
  -    */
  -   private static class JBossMQTXInterface
  -   {
  -
  -      static boolean initialzied = false;
  -      static Method getXAResourceManager;
  -      static Method startTx;
  -      static Method endTx;
  -      static Method commit;
  -      static Method rollback;
  -      boolean doRollback = false;
  -      Object xid = null;
  -      Object spyXAResourceManager = null;
  -
  -      JBossMQTXInterface(Session sess) throws Exception
  -      {
  -         if (!initialzied)
  -         {
  -            getXAResourceManager = 
Class.forName("org.jboss.mq.SpySession").getMethod("getXAResourceManager", new 
Class[]{});
  -            startTx = 
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("startTx", new Class[]{});
  -            endTx = 
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("endTx", new 
Class[]{Object.class, boolean.class});
  -            commit = 
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("commit", new 
Class[]{Object.class, boolean.class});
  -            rollback = 
Class.forName("org.jboss.mq.SpyXAResourceManager").getMethod("rollback", new 
Class[]{Object.class});
  -            initialzied = true;
  -         }
  -         spyXAResourceManager = getXAResourceManager.invoke(sess, new Object[]{});
  -      }
   
  -      void setRollbackOnly()
  -      {
  -         doRollback = true;
  -      }
  +}
   
  -      void startTX() throws Exception
  -      {
  -         xid = startTx.invoke(spyXAResourceManager, new Object[]{});
  -      }
   
  -      void endTX() throws Exception
  -      {
  -         if (doRollback)
  -         {
  -            endTx.invoke(spyXAResourceManager, new Object[]{xid, new 
Boolean(true)});
  -            rollback.invoke(spyXAResourceManager, new Object[]{xid});
  -         }
  -         else
  -         {
  -            endTx.invoke(spyXAResourceManager, new Object[]{xid, new 
Boolean(true)});
  -            commit.invoke(spyXAResourceManager, new Object[]{xid, new 
Boolean(true)});
  -         }
  -      }
  -   }
  -}
  
  
  
  1.15      +21 -10    jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
  
  Index: StdServerSessionPool.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- StdServerSessionPool.java 2001/09/25 03:34:46     1.14
  +++ StdServerSessionPool.java 2001/12/12 22:12:18     1.15
  @@ -37,7 +37,7 @@
    *
    * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
    * @author    <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a> .
  - * @version   $Revision: 1.14 $
  + * @version   $Revision: 1.15 $
    */
   public class StdServerSessionPool
          implements ServerSessionPool
  @@ -112,11 +112,11 @@
      /**
       * Construct a <tt>StdServerSessionPool</tt> using the default pool size.
       *
  -    * @param con
  -    * @param transacted
  -    * @param ack
  -    * @param listener
  -    * @param maxSession
  +    * @param con connection to get sessions from
  +    * @param transacted transaction mode when not XA (
  +    * @param ack ackmode when not XA
  +    * @param listener the listener the sessions will call
  +    * @param maxSession maximum number of sessions in the pool
       * @param isuseLocalTX  Description of Parameter
       * @exception JMSException    Description of Exception
       */
  @@ -360,16 +360,27 @@
               throw new JMSException("Connection was not reconizable: " + con);
            }
   
  +
  +
  +         // create the server session and add it to the pool - it is up to the
  +      // server session to set the listener
  +         StdServerSession serverSession = new StdServerSession(this, ses, xaSes, 
listener, useLocalTX);
  +
  +
            // This might not be totaly spec compliant since it says that app
            // server should create as many message listeners its needs.
  -         log.debug("setting session listener: " + listener);
  -         ses.setMessageListener(listener);
  +         //log.debug("setting session listener: " + listener);
  +      //if(xaSession
  +      //ses.setMessageListener(serverSession);
  +      //FIXME, it seems as if Sonic is using ites XaSession to do all work
  +      //if (xaSes != null)
  +      //   xaSes.setMessageListener(serverSession);
   
  -         // create the server session and add it to the pool
  -         ServerSession serverSession = new StdServerSession(this, ses, xaSes, 
useLocalTX);
            sessionPool.add(serverSession);
            numServerSessions++;
            log.debug("added server session to the pool: " + serverSession);
         }
      }
   }
  +
  +
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to