User: starksm 
  Date: 01/12/13 19:22:11

  Modified:    src/main/org/jboss/jms/asf Tag: Branch_2_4
                        ServerSessionPoolFactory.java
                        ServerSessionPoolLoader.java
                        ServerSessionPoolLoaderMBean.java
                        StdServerSession.java StdServerSessionPool.java
                        StdServerSessionPoolFactory.java
  Log:
  Integrate changes from 3.0 to improve the MDB/ASF layer. This includes
  support for the dead message queue for repeated MDB.onMessage failures.
  
  Revision  Changes    Path
  No                   revision
  
  
  No                   revision
  
  
  1.1.6.3   +20 -29    jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java
  
  Index: ServerSessionPoolFactory.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java,v
  retrieving revision 1.1.6.2
  retrieving revision 1.1.6.3
  diff -u -r1.1.6.2 -r1.1.6.3
  --- ServerSessionPoolFactory.java     2001/08/23 03:58:15     1.1.6.2
  +++ ServerSessionPoolFactory.java     2001/12/14 03:22:11     1.1.6.3
  @@ -1,67 +1,58 @@
   /*
  - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
  + * JBoss, the OpenSource J2EE webOS
    *
  - * This library is free software; you can redistribute it and/or
  - * modify it under the terms of the GNU Lesser General Public
  - * License as published by the Free Software Foundation; either
  - * version 2 of the License, or (at your option) any later version
  - * 
  - * This library is distributed in the hope that it will be useful,
  - * but WITHOUT ANY WARRANTY; without even the implied warranty of
  - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  - * Lesser General Public License for more details.
  - * 
  - * You should have received a copy of the GNU Lesser General Public
  - * License along with this library; if not, write to the Free Software
  - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
   
   import javax.jms.Connection;
  +import javax.jms.JMSException;
   import javax.jms.MessageListener;
   import javax.jms.ServerSessionPool;
  -import javax.jms.JMSException;
   
   /**
  - * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects.
  + * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects. <p>
    *
  - * <p>Created: Wed Nov 29 15:55:21 2000
  + * Created: Wed Nov 29 15:55:21 2000
    *
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @version $Revision: 1.1.6.2 $
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a> .
  + * @version   $Revision: 1.1.6.3 $
    */
   public interface ServerSessionPoolFactory
   {
      /**
       * Set the name of the factory.
       *
  -    * @param name    The name of the factory.
  +    * @param name  The name of the factory.
       */
      void setName(String name);
   
      /**
       * Get the name of the factory.
       *
  -    * @return    The name of the factory.
  +    * @return   The name of the factory.
       */
      String getName();
   
      /**
  -    * Create a new <tt>ServerSessionPool</tt>.
  +    * Create a new <tt>ServerSessionPool</tt> .
       *
       * @param con
       * @param maxSession
       * @param isTransacted
       * @param ack
       * @param listener
  -    * @return                A new pool.
  -    *
  +    * @param useLocalTX  
  +    * @return                    A new pool.
       * @throws JMSException
       */
      ServerSessionPool getServerSessionPool(Connection con,
  -                                          int maxSession,
  -                                          boolean isTransacted,
  -                                          int ack,
  -                                          MessageListener listener)
  -      throws JMSException;
  +         int maxSession,
  +         boolean isTransacted,
  +         int ack,
  +         boolean useLocalTX,
  +         MessageListener listener)
  +          throws JMSException;
   }
  
  
  
  1.2.6.3   +6 -22     jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java
  
  Index: ServerSessionPoolLoader.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java,v
  retrieving revision 1.2.6.2
  retrieving revision 1.2.6.3
  diff -u -r1.2.6.2 -r1.2.6.3
  --- ServerSessionPoolLoader.java      2001/08/23 03:58:15     1.2.6.2
  +++ ServerSessionPoolLoader.java      2001/12/14 03:22:11     1.2.6.3
  @@ -27,8 +27,6 @@
   import javax.naming.NamingException;
   import javax.naming.NameNotFoundException;
   
  -import org.apache.log4j.Category;
  -
   import org.jboss.util.ServiceMBeanSupport;
   
   /**
  @@ -38,15 +36,12 @@
    *
    * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
    * @author <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
  - * @version $Revision: 1.2.6.2 $
  + * @version $Revision: 1.2.6.3 $
    */
   public class ServerSessionPoolLoader 
      extends ServiceMBeanSupport
      implements ServerSessionPoolLoaderMBean
   {
  -   /** Instance logger. */
  -   private final Category log = Category.getInstance(this.getClass());
  -
      /** The factory used to create server session pools. */
      private ServerSessionPoolFactory poolFactory;
   
  @@ -121,22 +116,6 @@
         return name;
      }
   
  -   /**
  -    * Initialize the service.
  -    *
  -    * <p>Setup the pool factory.
  -    *
  -    * @throws ClassNotFoundException   Could not find pool factory class.
  -    * @throws Exception                Failed to create pool factory instance.
  -    */
  -   protected void initService() throws Exception
  -   {
  -      Class cls = Class.forName(poolFactoryClass);
  -      poolFactory = (ServerSessionPoolFactory)cls.newInstance();
  -      poolFactory.setName(name);
  -
  -      log.debug("initalized with pool factory: " + poolFactory);
  -   }
   
      /**
       * Start the service.
  @@ -147,6 +126,11 @@
       */
      protected void startService() throws Exception
      {
  +      Class cls = Class.forName(poolFactoryClass);
  +      poolFactory = (ServerSessionPoolFactory)cls.newInstance();
  +      poolFactory.setName(name);
  +
  +      log.debug("initialized with pool factory: " + poolFactory);
         InitialContext ctx = new InitialContext();
         String name = poolFactory.getName();
         String jndiname = "java:/" + name;
  
  
  
  1.2.6.3   +1 -1      
jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java
  
  Index: ServerSessionPoolLoaderMBean.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java,v
  retrieving revision 1.2.6.2
  retrieving revision 1.2.6.3
  diff -u -r1.2.6.2 -r1.2.6.3
  --- ServerSessionPoolLoaderMBean.java 2001/08/23 03:58:15     1.2.6.2
  +++ ServerSessionPoolLoaderMBean.java 2001/12/14 03:22:11     1.2.6.3
  @@ -25,7 +25,7 @@
    * <p>Created: Wed Nov 29 16:20:17 2000
    *
    * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @version $Revision: 1.2.6.2 $
  + * @version $Revision: 1.2.6.3 $
    */
   public interface ServerSessionPoolLoaderMBean 
      extends ServiceMBean 
  
  
  
  1.4.6.3   +307 -161  jboss/src/main/org/jboss/jms/asf/StdServerSession.java
  
  Index: StdServerSession.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v
  retrieving revision 1.4.6.2
  retrieving revision 1.4.6.3
  diff -u -r1.4.6.2 -r1.4.6.3
  --- StdServerSession.java     2001/08/23 03:58:15     1.4.6.2
  +++ StdServerSession.java     2001/12/14 03:22:11     1.4.6.3
  @@ -1,77 +1,94 @@
   /*
  - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
  + * JBoss, the OpenSource J2EE webOS
    *
  - * This library is free software; you can redistribute it and/or
  - * modify it under the terms of the GNU Lesser General Public
  - * License as published by the Free Software Foundation; either
  - * version 2 of the License, or (at your option) any later version
  - * 
  - * This library is distributed in the hope that it will be useful,
  - * but WITHOUT ANY WARRANTY; without even the implied warranty of
  - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  - * Lesser General Public License for more details.
  - * 
  - * You should have received a copy of the GNU Lesser General Public
  - * License along with this library; if not, write to the Free Software
  - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
   
   import javax.jms.JMSException;
  +import javax.jms.Message;
  +import javax.jms.MessageListener;
   import javax.jms.ServerSession;
   import javax.jms.Session;
   import javax.jms.XASession;
  -
   import javax.naming.InitialContext;
  -
   import javax.transaction.Status;
   import javax.transaction.Transaction;
   import javax.transaction.TransactionManager;
   import javax.transaction.xa.XAResource;
  -
  -import org.apache.log4j.Category;
  +import javax.transaction.xa.Xid;
   
  +import org.jboss.logging.Logger;
   import org.jboss.tm.TransactionManagerService;
  +import org.jboss.tm.XidImpl;
   
   /**
  - * An implementation of ServerSession.
  + * An implementation of ServerSession. <p>
    *
  - * <p>Created: Thu Dec  7 18:25:40 2000
  + * Created: Thu Dec 7 18:25:40 2000
    *
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
  - * @version $Revision: 1.4.6.2 $
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Jason Dillon</a>
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a> .
  + * @version   $Revision: 1.4.6.3 $
    */
   public class StdServerSession
  -   implements Runnable, ServerSession
  +   implements Runnable, ServerSession, MessageListener
   {
  -   /** Instance logger. */
  -   private final Category log = Category.getInstance(this.getClass());
  -
  -   /** The server session pool which we belong to. */
  -   private StdServerSessionPool serverSessionPool; // = null;
  -
  -   /** Our session resource. */
  -   private Session session; // = null;
  -
  -   /** Our XA session resource. */
  -   private XASession xaSession; // = null;
  -
  -   /** The transaction manager that we will use for transactions. */
  +   /**
  +    * Instance logger.
  +    */
  +   static Logger log = Logger.getLogger(StdServerSession.class);
  +   
  +   /**
  +    * The server session pool which we belong to.
  +    */
  +   private StdServerSessionPool serverSessionPool;
  +   
  +   /**
  +    * Our session resource.
  +    */
  +   private Session session;
  +   
  +   /**
  +    * Our XA session resource.
  +    */
  +   private XASession xaSession;
  +   
  +   /**
  +    * The transaction manager that we will use for transactions.
  +    */
      private TransactionManager tm;
  -
  +   
      /**
  -    * Create a <tt>StdServerSession</tt>.
  -    *
  -    * @param pool         The server session pool which we belong to.
  -    * @param session      Our session resource.
  -    * @param xaSession    Our XA session resource.
  +    * Use the session's XAResource directly if we have an JBossMQ XASession.
  +    * this allows us to get around the TX timeout problem when you have
  +    * extensive message processing.
  +    */
  +   private boolean useLocalTX;
  +   
  +   /**
  +    * The listener to delegate calls, to. In our case the container invoker.
  +    */
  +   private MessageListener delegateListener;
  +   
  +   /**
  +    * Create a <tt>StdServerSession</tt> .
       *
  -    * @throws JMSException    Transation manager was not found.
  +    * @param pool              The server session pool which we belong to.
  +    * @param session           Our session resource.
  +    * @param xaSession         Our XA session resource.
  +    * @param delegateListener  Listener to call when messages arrives.
  +    * @param useLocalTX       Will this session be used in a global TX (we can 
optimize with 1 phase commit)
  +    * @throws JMSException     Transation manager was not found.
  +    * @exception JMSException  Description of Exception
       */
      StdServerSession(final StdServerSessionPool pool,
  -                    final Session session,
  -                    final XASession xaSession)
  +      final Session session,
  +      final XASession xaSession,
  +      final MessageListener delegateListener,
  +      boolean useLocalTX)
         throws JMSException
      {
         // assert pool != null
  @@ -80,179 +97,308 @@
         this.serverSessionPool = pool;
         this.session = session;
         this.xaSession = xaSession;
  -
  -      if (log.isDebugEnabled()) {
  -         log.debug("initializing (pool, session, xaSession): " +
  -                   pool + ", " + session + ", " + xaSession);
  -      }
  +      this.delegateListener = delegateListener;
  +      if( xaSession == null )
  +         useLocalTX = false;
  +      this.useLocalTX = useLocalTX;
  +      
  +      log.debug("initializing (pool, session, xaSession, useLocalTX): " +
  +      pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
  +      
  +      // Set out self as message listener
  +      if (xaSession != null)
  +         xaSession.setMessageListener(this);
  +      else
  +         session.setMessageListener(this);
         
         InitialContext ctx = null;
  -      try {
  +      try
  +      {
            ctx = new InitialContext();
  -         tm = (TransactionManager)
  -            ctx.lookup(TransactionManagerService.JNDI_NAME);
  +         tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
         }
  -      catch (Exception e) {
  +      catch (Exception e)
  +      {
            throw new JMSException("Transation manager was not found");
         }
  -      finally {
  -         if (ctx != null) {
  -            try {
  +      finally
  +      {
  +         if (ctx != null)
  +         {
  +            try
  +            {
                  ctx.close();
               }
  -            catch (Exception ignore) {}
  +            catch (Exception ignore)
  +            {
  +            }
            }
         }
      }
  -
  +   
      // --- Impl of JMS standard API
  -     
  +   
      /**
  -    * Returns the session.
  +    * Returns the session. <p>
       *
  -    * <p>This simply returns what it has fetched from the connection. It is
  -    *    up to the jms provider to typecast it and have a private API to stuff
  -    *    messages into it.
  +    * This simply returns what it has fetched from the connection. It is up to
  +    * the jms provider to typecast it and have a private API to stuff messages
  +    * into it.
       *
  -    * @return    The session.
  +    * @return                  The session.
  +    * @exception JMSException  Description of Exception
       */
      public Session getSession() throws JMSException
      {
  -      return session;
  +      if (xaSession != null)
  +         return xaSession;
  +      else
  +         return session;
      }
  -
  +   
  +   //--- Protected parts, used by other in the package
  +   
      /**
  -    * Start the session and begin consuming messages.
  -    *
  -    * @throws JMSException    No listener has been specified.
  +    * Runs in an own thread, basically calls the session.run(), it is up to the
  +    * session to have been filled with messages and it will run against the
  +    * listener set in StdServerSessionPool. When it has send all its messages it
  +    * returns.
       */
  -   public void start() throws JMSException {
  -      log.debug("starting invokes on server session");
  -
  -      if (session != null) {
  -         try {
  -            serverSessionPool.getExecutor().execute(this);
  -         }
  -         catch (InterruptedException ignore) {}
  -      }
  -      else {
  -         throw new JMSException("No listener has been specified");
  -      }
  +   public void run()
  +   {
  +      if( log.isTraceEnabled() )
  +         log.trace("running...");
  +      if (xaSession != null)
  +         xaSession.run();
  +      else
  +         session.run();
      }
  -    
  -   //--- Protected parts, used by other in the package
  -     
      /**
  -    * Runs in an own thread, basically calls the session.run(), it is up
  -    * to the session to have been filled with messages and it will run
  -    * against the listener set in StdServerSessionPool. When it has send
  -    * all its messages it returns.
  +    * Will get called from session for each message stuffed into it.
       *
  -    * HC: run() also starts a transaction with the TransactionManager and
  -    * enlists the XAResource of the JMS XASession if a XASession was
  -    * available.  A good JMS implementation should provide the XASession
  -    * for use in the ASF.  So we optimize for the case where we have an
  -    * XASession.  So, for the case where we do not have an XASession and
  -    * the bean is not transacted, we have the unneeded overhead of creating
  -    * a Transaction.  I'm leaving it this way since it keeps the code simpler
  -    * and that case should not be too common (JBossMQ provides XASessions).
  +    * Starts a transaction with the TransactionManager
  +    * and enlists the XAResource of the JMS XASession if a XASession was
  +    * available. A good JMS implementation should provide the XASession for use
  +    * in the ASF. So we optimize for the case where we have an XASession. So,
  +    * for the case where we do not have an XASession and the bean is not
  +    * transacted, we have the unneeded overhead of creating a Transaction. I'm
  +    * leaving it this way since it keeps the code simpler and that case should
  +    * not be too common (JBossMQ provides XASessions).
       */
  -   public void run() {
  -      log.debug("running...");
  -      
  +   public void onMessage(Message msg)
  +   {      
  +      boolean trace = log.isTraceEnabled();
  +      if( trace )
  +         log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
  +         ", " + session + ", " + xaSession + ", " + useLocalTX);
  +
  +      // Used if run with useLocalTX if true
  +      Xid localXid = null;
  +      boolean localRollbackFlag=false;
  +      // Used if run with useLocalTX if false
         Transaction trans = null;
  -      try {
  -         tm.begin();
  -         trans = tm.getTransaction();
  -         
  -         if (xaSession != null) {
  +      try
  +      {
  +         
  +         if (useLocalTX)
  +         {
  +            // Use JBossMQ One Phase Commit to commit the TX
  +            localXid = new XidImpl();
               XAResource res = xaSession.getXAResource();
  -            trans.enlistResource(res);
  -            if (log.isDebugEnabled()) {
  -               log.debug("XAResource '"+res+"' enlisted.");
  +            res.start(localXid, XAResource.TMNOFLAGS);
  +            
  +            if( trace )
  +               log.trace("Using optimized 1p commit to control TX.");
  +         }
  +         else
  +         {
  +            
  +            // Use the TM to control the TX
  +            tm.begin();
  +            trans = tm.getTransaction();
  +            
  +            if (xaSession != null)
  +            {
  +               XAResource res = xaSession.getXAResource();
  +               trans.enlistResource(res);
  +               if( trace )
  +                  log.trace("XAResource '" + res + "' enlisted.");
               }
  -         } 
  -
  +         }
  +         //currentTransactionId = connection.spyXAResourceManager.startTx();
  +         
            // run the session
  -         session.run();
  +         //session.run();
  +         // Call delegate listener
  +         delegateListener.onMessage(msg);
         }
  -      catch (Exception e) {
  +      catch (Exception e)
  +      {
            log.error("session failed to run; setting rollback only", e);
  -         
  -         try {
  -            // The transaction will be rolledback in the finally
  -            trans.setRollbackOnly();
  +         
  +         if (useLocalTX)
  +         {
  +            // Use JBossMQ One Phase Commit to commit the TX
  +            localRollbackFlag = true;
            }
  -         catch (Exception x) {
  -            log.error("failed to set rollback only", x);
  +         else
  +         {
  +            // Mark for tollback TX via TM
  +            try
  +            {
  +               // The transaction will be rolledback in the finally
  +               if( trace )
  +                  log.trace("Using TM to mark TX for rollback.");
  +               trans.setRollbackOnly();
  +            }
  +            catch (Exception x)
  +            {
  +               log.error("failed to set rollback only", x);
  +            }
            }
  +         
         }
  -      finally {
  -         try {
  -            // Marked rollback
  -            if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
  -               log.info("Rolling back JMS transaction");
  -               // actually roll it back 
  -               trans.rollback();
  -                 
  -               // NO XASession? then manually rollback.  
  -               // This is not so good but
  -               // it's the best we can do if we have no XASession.
  -               if (xaSession == null && serverSessionPool.isTransacted()) {
  -                  session.rollback();
  +      finally
  +      {
  +         try
  +         {
  +            if (useLocalTX)
  +            {
  +               if( localRollbackFlag == true  )
  +               {
  +                  if( trace )
  +                     log.trace("Using optimized 1p commit to rollback TX.");
  +                  
  +                  XAResource res = xaSession.getXAResource();
  +                  res.end(localXid, XAResource.TMSUCCESS);
  +                  res.rollback(localXid);
  +                  
                  }
  -            } else if (trans.getStatus() == Status.STATUS_ACTIVE) {
  -               // Commit tx
  -               // This will happen if
  -               // a) everything goes well
  -               // b) app. exception was thrown
  -               trans.commit();
  -                 
  -               // NO XASession? then manually commit.  This is not so good but
  -               // it's the best we can do if we have no XASession.
  -               if (xaSession == null && serverSessionPool.isTransacted()) {
  -                  session.commit();
  +               else
  +               {
  +                  if( trace )
  +                     log.trace("Using optimized 1p commit to commit TX.");
  +                  
  +                  XAResource res = xaSession.getXAResource();
  +                  res.end(localXid, XAResource.TMSUCCESS);
  +                  res.commit(localXid, true);
                  }
               }
  +            else
  +            {
  +               // Use the TM to commit the Tx               
  +               // Marked rollback
  +               if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
  +               {
  +                  if( trace )
  +                     log.trace("Rolling back JMS transaction");
  +                  // actually roll it back
  +                  trans.rollback();
  +                  
  +                  // NO XASession? then manually rollback.
  +                  // This is not so good but
  +                  // it's the best we can do if we have no XASession.
  +                  if (xaSession == null && serverSessionPool.isTransacted())
  +                  {
  +                     session.rollback();
  +                  }
  +               }
  +               else if (trans.getStatus() == Status.STATUS_ACTIVE)
  +               {
  +                  // Commit tx
  +                  // This will happen if
  +                  // a) everything goes well
  +                  // b) app. exception was thrown
  +                  if( trace )
  +                     log.trace("Commiting the JMS transaction");
  +                  trans.commit();
  +
  +                  // NO XASession? then manually commit.  This is not so good but
  +                  // it's the best we can do if we have no XASession.
  +                  if (xaSession == null && serverSessionPool.isTransacted())
  +                  {
  +                     session.commit();
  +                  }
  +               }
  +            }            
            }
  -         catch (Exception e) {
  +         catch (Exception e)
  +         {
               log.error("failed to commit/rollback", e);
            }
  -         
  +         
            StdServerSession.this.recycle();
         }
  -
  -      log.debug("done");
  +      if( trace )
  +         log.trace("onMessage done");
      }
  -    
  +
      /**
  -    * This method is called by the ServerSessionPool when it is ready to
  -    * be recycled intot the pool
  +    * Start the session and begin consuming messages.
  +    *
  +    * @throws JMSException  No listener has been specified.
       */
  -   void recycle()
  +   public void start() throws JMSException
      {
  -      serverSessionPool.recycle(this);
  +      if( log.isTraceEnabled() )
  +         log.trace("starting invokes on server session");
  +      
  +      if (session != null)
  +      {
  +         try
  +         {
  +            serverSessionPool.getExecutor().execute(this);
  +         }
  +         catch (InterruptedException ignore)
  +         {
  +         }
  +      }
  +      else
  +      {
  +         throw new JMSException("No listener has been specified");
  +      }
      }
  -
  +   
      /**
       * Called by the ServerSessionPool when the sessions should be closed.
       */
  -   void close() {
  -      if (session != null) {
  -         try {
  +   void close()
  +   {
  +      if (session != null)
  +      {
  +         try
  +         {
               session.close();
  -         } catch (Exception ignore) {}
  +         }
  +         catch (Exception ignore)
  +         {
  +         }
            
            session = null;
         }
         
  -      if (xaSession != null) {
  -         try {
  +      if (xaSession != null)
  +      {
  +         try
  +         {
               xaSession.close();
  -         } catch (Exception ignore) {}
  +         }
  +         catch (Exception ignore)
  +         {
  +         }
            xaSession = null;
         }
  -
  +      
         log.debug("closed");
      }
  +   
  +   /**
  +    * This method is called by the ServerSessionPool when it is ready to be
  +    * recycled intot the pool
  +    */
  +   void recycle()
  +   {
  +      serverSessionPool.recycle(this);
  +   }
  +   
   }
  
  
  
  1.4.6.3   +208 -151  jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
  
  Index: StdServerSessionPool.java
  ===================================================================
  RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v
  retrieving revision 1.4.6.2
  retrieving revision 1.4.6.3
  diff -u -r1.4.6.2 -r1.4.6.3
  --- StdServerSessionPool.java 2001/08/23 03:58:15     1.4.6.2
  +++ StdServerSessionPool.java 2001/12/14 03:22:11     1.4.6.3
  @@ -1,128 +1,132 @@
   /*
  - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
  + * JBoss, the OpenSource J2EE webOS
    *
  - * This library is free software; you can redistribute it and/or
  - * modify it under the terms of the GNU Lesser General Public
  - * License as published by the Free Software Foundation; either
  - * version 2 of the License, or (at your option) any later version
  - *
  - * This library is distributed in the hope that it will be useful,
  - * but WITHOUT ANY WARRANTY; without even the implied warranty of
  - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  - * Lesser General Public License for more details.
  - *
  - * You should have received a copy of the GNU Lesser General Public
  - * License along with this library; if not, write to the Free Software
  - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
   
  -import java.util.List;
  +import EDU.oswego.cs.dl.util.concurrent.Executor;
  +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
  +import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
   import java.util.ArrayList;
   import java.util.Iterator;
   
  +import java.util.List;
  +
   import javax.jms.Connection;
   import javax.jms.JMSException;
  +import javax.jms.MessageListener;
  +import javax.jms.QueueConnection;
   import javax.jms.ServerSession;
   import javax.jms.ServerSessionPool;
  -import javax.jms.MessageListener;
  +import javax.jms.Session;
   import javax.jms.TopicConnection;
  -import javax.jms.XATopicConnection;
  -import javax.jms.QueueConnection;
   import javax.jms.XAQueueConnection;
  -import javax.jms.Session;
  -import javax.jms.XASession;
   import javax.jms.XAQueueSession;
  +import javax.jms.XASession;
  +import javax.jms.XATopicConnection;
   import javax.jms.XATopicSession;
  -
  -import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
  -import EDU.oswego.cs.dl.util.concurrent.Executor;
  -import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
   
  -import org.apache.log4j.Category;
  +import org.jboss.logging.Logger;
   
   /**
  - * Implementation of ServerSessionPool.
  + * Implementation of ServerSessionPool. <p>
    *
  - * <p>Created: Thu Dec  7 17:02:03 2000
  + * Created: Thu Dec 7 17:02:03 2000
    *
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @version $Revision: 1.4.6.2 $
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a> .
  + * @version   $Revision: 1.4.6.3 $
    */
   public class StdServerSessionPool
  -   implements ServerSessionPool
  +       implements ServerSessionPool
   {
  -   /** The default size of the pool. */
  -   private static final int DEFAULT_POOL_SIZE = 15;
  +   /**
  +    * The default size of the pool.
  +    */
  +   private final static int DEFAULT_POOL_SIZE = 15;
   
  -   /** The thread group which session workers will run. */
  +   /**
  +    * The thread group which session workers will run.
  +    */
      private static ThreadGroup threadGroup =
  -      new ThreadGroup("ASF Session Pool Threads");
  +         new ThreadGroup("ASF Session Pool Threads");
   
  -   /** Instance logger. */
  -   private final Category log = Category.getInstance(this.getClass());
  +   /**
  +    * Instance logger.
  +    */
  +   private final Logger log = Logger.getLogger(this.getClass());
   
  -   /** The size of the pool. */
  +   /**
  +    * The size of the pool.
  +    */
      private int poolSize;
   
  -   /** The message acknowledgment mode. */
  +   /**
  +    * The message acknowledgment mode.
  +    */
      private int ack;
   
  -   /** True if this is a transacted session. */
  +   /**
  +    * Is the bean container managed?
  +    */
  +   private boolean useLocalTX;
  +
  +   /**
  +    * True if this is a transacted session.
  +    */
      private boolean transacted;
   
  -   /** The session connection. */
  +   /**
  +    * The session connection.
  +    */
      private Connection con;
   
  -   /** The message listener for the session. */
  +   /**
  +    * The message listener for the session.
  +    */
      private MessageListener listener;
   
  -   /** The list of ServerSessions. */
  +   /**
  +    * The list of ServerSessions.
  +    */
      private List sessionPool;
   
  -   /** The executor for processing messages? */
  +   /**
  +    * The executor for processing messages?
  +    */
      private PooledExecutor executor;
   
  -   /** Used to signal when the Pool is being closed down */
  +   /**
  +    * Used to signal when the Pool is being closed down
  +    */
      private boolean closing = false;
   
  -   /** Used during close down to wait for all server sessions to be returned and 
closed.*/
  -   private int numServerSessions = 0;
  -
      /**
  -    * Construct a <tt>StdServerSessionPool</tt> using the default
  -    * pool size.
  -    *
  -    * @param con
  -    * @param transacted
  -    * @param ack
  -    * @param listener
  +    * Used during close down to wait for all server sessions to be returned and
  +    * closed.
       */
  -   public StdServerSessionPool(final Connection con,
  -                               final boolean transacted,
  -                               final int ack,
  -                               final MessageListener listener)
  -      throws JMSException
  -   {
  -      this(con, transacted, ack, listener, DEFAULT_POOL_SIZE);
  -   }
  +   private int numServerSessions = 0;
   
      /**
  -    * Construct a <tt>StdServerSessionPool</tt> using the default
  -    * pool size.
  +    * Construct a <tt>StdServerSessionPool</tt> using the default pool size.
       *
  -    * @param con
  -    * @param transacted
  -    * @param ack
  -    * @param listener
  -    * @param maxSession
  +    * @param con connection to get sessions from
  +    * @param transacted transaction mode when not XA (
  +    * @param ack ackmode when not XA
  +    * @param listener the listener the sessions will call
  +    * @param maxSession maximum number of sessions in the pool
  +    * @param isuseLocalTX  Description of Parameter
  +    * @exception JMSException    Description of Exception
       */
      public StdServerSessionPool(final Connection con,
  -                               final boolean transacted,
  -                               final int ack,
  -                               final MessageListener listener,
  -                               final int maxSession)
  -      throws JMSException
  +         final boolean transacted,
  +         final int ack,
  +         final boolean useLocalTX,
  +         final MessageListener listener,
  +         final int maxSession)
  +         throws JMSException
      {
         this.con = con;
         this.ack = ack;
  @@ -130,19 +134,29 @@
         this.transacted = transacted;
         this.poolSize = maxSession;
         this.sessionPool = new ArrayList(maxSession);
  +      this.useLocalTX = useLocalTX;
   
         // setup the worker pool
         executor = new PooledExecutor(poolSize);
         executor.setMinimumPoolSize(0);
  -      executor.setKeepAliveTime(1000*30);
  +      executor.setKeepAliveTime(1000 * 30);
         executor.waitWhenBlocked();
  -      executor.setThreadFactory(new ThreadFactory() {
  +      executor.setThreadFactory(
  +         new ThreadFactory()
  +         {
               private volatile int count = 0;
  -            
  -            public Thread newThread(final Runnable command) {
  +
  +            /**
  +             * #Description of the Method
  +             *
  +             * @param command  Description of Parameter
  +             * @return         Description of the Returned Value
  +             */
  +            public Thread newThread(final Runnable command)
  +            {
                  return new Thread(threadGroup,
  -                                 command,
  -                                 "Thread Pool Worker-" + count++);
  +                     command,
  +                     "Thread Pool Worker-" + count++);
               }
            });
   
  @@ -155,98 +169,78 @@
   
      /**
       * Get a server session.
  -    *
  -    * @return    A server session.
       *
  -    * @throws JMSException    Failed to get a server session.
  +    * @return               A server session.
  +    * @throws JMSException  Failed to get a server session.
       */
      public ServerSession getServerSession() throws JMSException
      {
  -      log.debug("getting a server session");
  +      if( log.isTraceEnabled() )
  +         log.trace("getting a server session");
         ServerSession session = null;
   
  -      try {
  -         while (true) {
  -            synchronized (sessionPool) {
  -               if(closing){
  +      try
  +      {
  +         while (true)
  +         {
  +            synchronized (sessionPool)
  +            {
  +               if (closing)
  +               {
                     throw new JMSException("Cannot get session after pool has been 
closed down.");
                  }
  -               else if (sessionPool.size() > 0) {
  +               else if (sessionPool.size() > 0)
  +               {
                     session = (ServerSession)sessionPool.remove(0);
                     break;
                  }
  -               else {
  -                  try {
  +               else
  +               {
  +                  try
  +                  {
                        sessionPool.wait();
  +                  }
  +                  catch (InterruptedException ignore)
  +                  {
                     }
  -                  catch (InterruptedException ignore) {}
                  }
               }
            }
         }
  -      catch (Exception e) {
  +      catch (Exception e)
  +      {
            throw new JMSException("Failed to get a server session: " + e);
         }
   
         // assert session != null
  -
  -      log.debug("using server session: " + session);
  +      if( log.isTraceEnabled() )
  +         log.trace("using server session: " + session);
         return session;
      }
   
  -   // --- Protected messages for StdServerSession to use
  -
  -   /**
  -    * Returns true if this server session is transacted.
  -    */
  -   boolean isTransacted() {
  -      return transacted;
  -   }
  -
  -   /**
  -    * Recycle a server session.
  -    */
  -   void recycle(StdServerSession session) {
  -      synchronized (sessionPool) {
  -         if(closing){
  -           session.close();
  -           numServerSessions--;
  -           if(numServerSessions == 0)  //notify clear thread.
  -              sessionPool.notifyAll();
  -         }else{
  -           sessionPool.add(session);
  -           sessionPool.notifyAll();
  -           log.debug("recycled server session: " + session);
  -         }
  -      }
  -   }
  -
  -   /**
  -    * Get the executor we are using.
  -    */
  -   Executor getExecutor() {
  -      return executor;
  -   }
  -
      /**
       * Clear the pool, clear out both threads and ServerSessions,
       * connection.stop() should be run before this method.
       */
  -   public void clear() {
  -      synchronized (sessionPool) {
  +   public void clear()
  +   {
  +      synchronized (sessionPool)
  +      {
            // FIXME - is there a runaway condition here. What if a
            // ServerSession are taken by a ConnecionConsumer? Should we set
            // a flag somehow so that no ServerSessions are recycled and the
            // ThreadPool won't leave any more threads out.
            closing = true;
   
  -         if (log.isDebugEnabled()) {
  +         if (log.isDebugEnabled())
  +         {
               log.debug("Clearing " + sessionPool.size() +
  -                      " from ServerSessionPool");
  +                  " from ServerSessionPool");
            }
   
            Iterator iter = sessionPool.iterator();
  -         while (iter.hasNext()) {
  +         while (iter.hasNext())
  +         {
               StdServerSession ses = (StdServerSession)iter.next();
               // Should we do anything to the server session?
               ses.close();
  @@ -261,9 +255,69 @@
         executor.shutdownAfterProcessingCurrentlyQueuedTasks();
   
         //wait for all server sessions to be returned.
  -      synchronized(sessionPool){
  -         while(numServerSessions > 0)
  -            try{ sessionPool.wait(); }catch(InterruptedException ignore){}
  +      synchronized (sessionPool)
  +      {
  +         while (numServerSessions > 0)
  +         {
  +            try
  +            {
  +               sessionPool.wait();
  +            }
  +            catch (InterruptedException ignore)
  +            {
  +            }
  +         }
  +      }
  +   }
  +
  +   /**
  +    * Get the executor we are using.
  +    *
  +    * @return   The Executor value
  +    */
  +   Executor getExecutor()
  +   {
  +      return executor;
  +   }
  +
  +   // --- Protected messages for StdServerSession to use
  +
  +   /**
  +    * Returns true if this server session is transacted.
  +    *
  +    * @return   The Transacted value
  +    */
  +   boolean isTransacted()
  +   {
  +      return transacted;
  +   }
  +
  +   /**
  +    * Recycle a server session.
  +    *
  +    * @param session  Description of Parameter
  +    */
  +   void recycle(StdServerSession session)
  +   {
  +      synchronized (sessionPool)
  +      {
  +         if (closing)
  +         {
  +            session.close();
  +            numServerSessions--;
  +            if (numServerSessions == 0)
  +            {
  +               //notify clear thread.
  +               sessionPool.notifyAll();
  +            }
  +         }
  +         else
  +         {
  +            sessionPool.add(session);
  +            sessionPool.notifyAll();
  +            if( log.isTraceEnabled() )
  +               log.trace("recycled server session: " + session);
  +         }
         }
      }
   
  @@ -271,44 +325,47 @@
   
      private void init() throws JMSException
      {
  -      for (int index = 0; index < poolSize; index++) {
  +      for (int index = 0; index < poolSize; index++)
  +      {
            // Here is the meat, that MUST follow the spec
            Session ses = null;
            XASession xaSes = null;
   
            log.debug("initializing with connection: " + con);
   
  -         if (con instanceof XATopicConnection) {
  +         if (con instanceof XATopicConnection)
  +         {
               xaSes = ((XATopicConnection)con).createXATopicSession();
               ses = ((XATopicSession)xaSes).getTopicSession();
            }
  -         else if (con instanceof XAQueueConnection) {
  +         else if (con instanceof XAQueueConnection)
  +         {
               xaSes = ((XAQueueConnection)con).createXAQueueSession();
               ses = ((XAQueueSession)xaSes).getQueueSession();
            }
  -         else if (con instanceof TopicConnection) {
  +         else if (con instanceof TopicConnection)
  +         {
               ses = ((TopicConnection)con).createTopicSession(transacted, ack);
               log.warn("Using a non-XA TopicConnection.  " +
  -                     "It will not be able to participate in a Global UOW");
  +                  "It will not be able to participate in a Global UOW");
            }
  -         else if (con instanceof QueueConnection) {
  +         else if (con instanceof QueueConnection)
  +         {
               ses = ((QueueConnection)con).createQueueSession(transacted, ack);
               log.warn("Using a non-XA QueueConnection.  " +
  -                     "It will not be able to participate in a Global UOW");
  +                  "It will not be able to participate in a Global UOW");
            }
  -         else {
  +         else
  +         {
               // should never happen really
               log.error("Connection was not reconizable: " + con);
               throw new JMSException("Connection was not reconizable: " + con);
            }
   
  -         // This might not be totaly spec compliant since it says that app
  -         // server should create as many message listeners its needs.
  -         log.debug("setting session listener: " + listener);
  -         ses.setMessageListener(listener);
  +         // create the server session and add it to the pool - it is up to the
  +         // server session to set the listener
  +         StdServerSession serverSession = new StdServerSession(this, ses, xaSes, 
listener, useLocalTX);
   
  -         // create the server session and add it to the pool
  -         ServerSession serverSession = new StdServerSession(this, ses, xaSes);
            sessionPool.add(serverSession);
            numServerSessions++;
            log.debug("added server session to the pool: " + serverSession);
  
  
  
  1.3.6.3   +32 -45    
jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
  
  Index: StdServerSessionPoolFactory.java
  ===================================================================
  RCS file: 
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java,v
  retrieving revision 1.3.6.2
  retrieving revision 1.3.6.3
  diff -u -r1.3.6.2 -r1.3.6.3
  --- StdServerSessionPoolFactory.java  2001/08/23 03:58:15     1.3.6.2
  +++ StdServerSessionPoolFactory.java  2001/12/14 03:22:11     1.3.6.3
  @@ -1,93 +1,80 @@
   /*
  - * Copyright (c) 2000 Peter Antman DN <[EMAIL PROTECTED]>
  + * JBoss, the OpenSource J2EE webOS
    *
  - * This library is free software; you can redistribute it and/or
  - * modify it under the terms of the GNU Lesser General Public
  - * License as published by the Free Software Foundation; either
  - * version 2 of the License, or (at your option) any later version
  - * 
  - * This library is distributed in the hope that it will be useful,
  - * but WITHOUT ANY WARRANTY; without even the implied warranty of
  - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  - * Lesser General Public License for more details.
  - * 
  - * You should have received a copy of the GNU Lesser General Public
  - * License along with this library; if not, write to the Free Software
  - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  + * Distributable under LGPL license.
  + * See terms of license at gnu.org.
    */
   package org.jboss.jms.asf;
   
   import java.io.Serializable;
  -
  -import javax.jms.ServerSessionPool;
  -import javax.jms.MessageListener;
   import javax.jms.Connection;
   import javax.jms.JMSException;
  +import javax.jms.MessageListener;
   
  +import javax.jms.ServerSessionPool;
  +
   /**
  - * An implementation of ServerSessionPoolFactory.
  + * An implementation of ServerSessionPoolFactory. <p>
    *
  - * <p>Created: Fri Dec 22 09:47:41 2000
  + * Created: Fri Dec 22 09:47:41 2000
    *
  - * @author <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a>.
  - * @version $Revision: 1.3.6.2 $
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Peter Antman</a> .
  + * @author    <a href="mailto:[EMAIL PROTECTED]";>Hiram Chirino</a> .
  + * @version   $Revision: 1.3.6.3 $
    */
   public class StdServerSessionPoolFactory
  -   implements ServerSessionPoolFactory, Serializable
  +       implements ServerSessionPoolFactory, Serializable
   {
  -   /** The name of this factory. */
  +   /**
  +    * The name of this factory.
  +    */
      private String name;
   
      /**
  -    * Construct a <tt>StdServerSessionPoolFactory</tt>.
  +    * Construct a <tt>StdServerSessionPoolFactory</tt> .
       */
  -   public StdServerSessionPoolFactory() {
  +   public StdServerSessionPoolFactory()
  +   {
         super();
      }
   
      /**
       * Set the name of the factory.
       *
  -    * @param name    The name of the factory.
  +    * @param name  The name of the factory.
       */
  -   public void setName(final String name) {
  +   public void setName(final String name)
  +   {
         this.name = name;
      }
  -   
  +
      /**
       * Get the name of the factory.
       *
  -    * @return    The name of the factory.
  +    * @return   The name of the factory.
       */
  -   public String getName() {
  +   public String getName()
  +   {
         return name;
      }
   
      /**
  -    * Create a new <tt>ServerSessionPool</tt>.
  +    * Create a new <tt>ServerSessionPool</tt> .
       *
       * @param con
       * @param maxSession
       * @param isTransacted
       * @param ack
       * @param listener
  -    * @return                A new pool.
  -    *
  +    * @param isContainerManaged          Description of Parameter
  +    * @return                            A new pool.
       * @throws JMSException
  +    * @exception javax.jms.JMSException  Description of Exception
       */
  -   public ServerSessionPool getServerSessionPool(final Connection con,
  -                                                 final int maxSession,
  -                                                 final boolean isTransacted,
  -                                                 final int ack,
  -                                                 final MessageListener listener)
  -      throws JMSException
  +   public ServerSessionPool getServerSessionPool(Connection con, int maxSession,
  +      boolean isTransacted, int ack, boolean useLocalTX, MessageListener listener) 
throws JMSException
      {
  -      ServerSessionPool pool = (ServerSessionPool)
  -         new StdServerSessionPool(con,
  -                                  isTransacted,
  -                                  ack,
  -                                  listener,
  -                                  maxSession);
  +      ServerSessionPool pool = new StdServerSessionPool(con, isTransacted, ack, 
useLocalTX, listener, maxSession);
         return pool;
      }
   }
  
  
  

_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development

Reply via email to