User: starksm Date: 01/12/13 19:22:11 Modified: src/main/org/jboss/jms/asf Tag: Branch_2_4 ServerSessionPoolFactory.java ServerSessionPoolLoader.java ServerSessionPoolLoaderMBean.java StdServerSession.java StdServerSessionPool.java StdServerSessionPoolFactory.java Log: Integrate changes from 3.0 to improve the MDB/ASF layer. This includes support for the dead message queue for repeated MDB.onMessage failures. Revision Changes Path No revision No revision 1.1.6.3 +20 -29 jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java Index: ServerSessionPoolFactory.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java,v retrieving revision 1.1.6.2 retrieving revision 1.1.6.3 diff -u -r1.1.6.2 -r1.1.6.3 --- ServerSessionPoolFactory.java 2001/08/23 03:58:15 1.1.6.2 +++ ServerSessionPoolFactory.java 2001/12/14 03:22:11 1.1.6.3 @@ -1,67 +1,58 @@ /* - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]> + * JBoss, the OpenSource J2EE webOS * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * Distributable under LGPL license. + * See terms of license at gnu.org. */ package org.jboss.jms.asf; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.MessageListener; import javax.jms.ServerSessionPool; -import javax.jms.JMSException; /** - * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects. + * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects. <p> * - * <p>Created: Wed Nov 29 15:55:21 2000 + * Created: Wed Nov 29 15:55:21 2000 * - * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>. - * @version $Revision: 1.1.6.2 $ + * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> . + * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> . + * @version $Revision: 1.1.6.3 $ */ public interface ServerSessionPoolFactory { /** * Set the name of the factory. * - * @param name The name of the factory. + * @param name The name of the factory. */ void setName(String name); /** * Get the name of the factory. * - * @return The name of the factory. + * @return The name of the factory. */ String getName(); /** - * Create a new <tt>ServerSessionPool</tt>. + * Create a new <tt>ServerSessionPool</tt> . * * @param con * @param maxSession * @param isTransacted * @param ack * @param listener - * @return A new pool. - * + * @param useLocalTX + * @return A new pool. * @throws JMSException */ ServerSessionPool getServerSessionPool(Connection con, - int maxSession, - boolean isTransacted, - int ack, - MessageListener listener) - throws JMSException; + int maxSession, + boolean isTransacted, + int ack, + boolean useLocalTX, + MessageListener listener) + throws JMSException; } 1.2.6.3 +6 -22 jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java Index: ServerSessionPoolLoader.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java,v retrieving revision 1.2.6.2 retrieving revision 1.2.6.3 diff -u -r1.2.6.2 -r1.2.6.3 --- ServerSessionPoolLoader.java 2001/08/23 03:58:15 1.2.6.2 +++ ServerSessionPoolLoader.java 2001/12/14 03:22:11 1.2.6.3 @@ -27,8 +27,6 @@ import javax.naming.NamingException; import javax.naming.NameNotFoundException; -import org.apache.log4j.Category; - import org.jboss.util.ServiceMBeanSupport; /** @@ -38,15 +36,12 @@ * * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>. * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a> - * @version $Revision: 1.2.6.2 $ + * @version $Revision: 1.2.6.3 $ */ public class ServerSessionPoolLoader extends ServiceMBeanSupport implements ServerSessionPoolLoaderMBean { - /** Instance logger. */ - private final Category log = Category.getInstance(this.getClass()); - /** The factory used to create server session pools. */ private ServerSessionPoolFactory poolFactory; @@ -121,22 +116,6 @@ return name; } - /** - * Initialize the service. - * - * <p>Setup the pool factory. - * - * @throws ClassNotFoundException Could not find pool factory class. - * @throws Exception Failed to create pool factory instance. - */ - protected void initService() throws Exception - { - Class cls = Class.forName(poolFactoryClass); - poolFactory = (ServerSessionPoolFactory)cls.newInstance(); - poolFactory.setName(name); - - log.debug("initalized with pool factory: " + poolFactory); - } /** * Start the service. @@ -147,6 +126,11 @@ */ protected void startService() throws Exception { + Class cls = Class.forName(poolFactoryClass); + poolFactory = (ServerSessionPoolFactory)cls.newInstance(); + poolFactory.setName(name); + + log.debug("initialized with pool factory: " + poolFactory); InitialContext ctx = new InitialContext(); String name = poolFactory.getName(); String jndiname = "java:/" + name; 1.2.6.3 +1 -1 jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java Index: ServerSessionPoolLoaderMBean.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java,v retrieving revision 1.2.6.2 retrieving revision 1.2.6.3 diff -u -r1.2.6.2 -r1.2.6.3 --- ServerSessionPoolLoaderMBean.java 2001/08/23 03:58:15 1.2.6.2 +++ ServerSessionPoolLoaderMBean.java 2001/12/14 03:22:11 1.2.6.3 @@ -25,7 +25,7 @@ * <p>Created: Wed Nov 29 16:20:17 2000 * * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>. - * @version $Revision: 1.2.6.2 $ + * @version $Revision: 1.2.6.3 $ */ public interface ServerSessionPoolLoaderMBean extends ServiceMBean 1.4.6.3 +307 -161 jboss/src/main/org/jboss/jms/asf/StdServerSession.java Index: StdServerSession.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v retrieving revision 1.4.6.2 retrieving revision 1.4.6.3 diff -u -r1.4.6.2 -r1.4.6.3 --- StdServerSession.java 2001/08/23 03:58:15 1.4.6.2 +++ StdServerSession.java 2001/12/14 03:22:11 1.4.6.3 @@ -1,77 +1,94 @@ /* - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]> + * JBoss, the OpenSource J2EE webOS * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * Distributable under LGPL license. + * See terms of license at gnu.org. */ package org.jboss.jms.asf; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; import javax.jms.ServerSession; import javax.jms.Session; import javax.jms.XASession; - import javax.naming.InitialContext; - import javax.transaction.Status; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import javax.transaction.xa.XAResource; - -import org.apache.log4j.Category; +import javax.transaction.xa.Xid; +import org.jboss.logging.Logger; import org.jboss.tm.TransactionManagerService; +import org.jboss.tm.XidImpl; /** - * An implementation of ServerSession. + * An implementation of ServerSession. <p> * - * <p>Created: Thu Dec 7 18:25:40 2000 + * Created: Thu Dec 7 18:25:40 2000 * - * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>. - * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a> - * @version $Revision: 1.4.6.2 $ + * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> . + * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a> + * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> . + * @version $Revision: 1.4.6.3 $ */ public class StdServerSession - implements Runnable, ServerSession + implements Runnable, ServerSession, MessageListener { - /** Instance logger. */ - private final Category log = Category.getInstance(this.getClass()); - - /** The server session pool which we belong to. */ - private StdServerSessionPool serverSessionPool; // = null; - - /** Our session resource. */ - private Session session; // = null; - - /** Our XA session resource. */ - private XASession xaSession; // = null; - - /** The transaction manager that we will use for transactions. */ + /** + * Instance logger. + */ + static Logger log = Logger.getLogger(StdServerSession.class); + + /** + * The server session pool which we belong to. + */ + private StdServerSessionPool serverSessionPool; + + /** + * Our session resource. + */ + private Session session; + + /** + * Our XA session resource. + */ + private XASession xaSession; + + /** + * The transaction manager that we will use for transactions. + */ private TransactionManager tm; - + /** - * Create a <tt>StdServerSession</tt>. - * - * @param pool The server session pool which we belong to. - * @param session Our session resource. - * @param xaSession Our XA session resource. + * Use the session's XAResource directly if we have an JBossMQ XASession. + * this allows us to get around the TX timeout problem when you have + * extensive message processing. + */ + private boolean useLocalTX; + + /** + * The listener to delegate calls, to. In our case the container invoker. + */ + private MessageListener delegateListener; + + /** + * Create a <tt>StdServerSession</tt> . * - * @throws JMSException Transation manager was not found. + * @param pool The server session pool which we belong to. + * @param session Our session resource. + * @param xaSession Our XA session resource. + * @param delegateListener Listener to call when messages arrives. + * @param useLocalTX Will this session be used in a global TX (we can optimize with 1 phase commit) + * @throws JMSException Transation manager was not found. + * @exception JMSException Description of Exception */ StdServerSession(final StdServerSessionPool pool, - final Session session, - final XASession xaSession) + final Session session, + final XASession xaSession, + final MessageListener delegateListener, + boolean useLocalTX) throws JMSException { // assert pool != null @@ -80,179 +97,308 @@ this.serverSessionPool = pool; this.session = session; this.xaSession = xaSession; - - if (log.isDebugEnabled()) { - log.debug("initializing (pool, session, xaSession): " + - pool + ", " + session + ", " + xaSession); - } + this.delegateListener = delegateListener; + if( xaSession == null ) + useLocalTX = false; + this.useLocalTX = useLocalTX; + + log.debug("initializing (pool, session, xaSession, useLocalTX): " + + pool + ", " + session + ", " + xaSession + ", " + useLocalTX); + + // Set out self as message listener + if (xaSession != null) + xaSession.setMessageListener(this); + else + session.setMessageListener(this); InitialContext ctx = null; - try { + try + { ctx = new InitialContext(); - tm = (TransactionManager) - ctx.lookup(TransactionManagerService.JNDI_NAME); + tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME); } - catch (Exception e) { + catch (Exception e) + { throw new JMSException("Transation manager was not found"); } - finally { - if (ctx != null) { - try { + finally + { + if (ctx != null) + { + try + { ctx.close(); } - catch (Exception ignore) {} + catch (Exception ignore) + { + } } } } - + // --- Impl of JMS standard API - + /** - * Returns the session. + * Returns the session. <p> * - * <p>This simply returns what it has fetched from the connection. It is - * up to the jms provider to typecast it and have a private API to stuff - * messages into it. + * This simply returns what it has fetched from the connection. It is up to + * the jms provider to typecast it and have a private API to stuff messages + * into it. * - * @return The session. + * @return The session. + * @exception JMSException Description of Exception */ public Session getSession() throws JMSException { - return session; + if (xaSession != null) + return xaSession; + else + return session; } - + + //--- Protected parts, used by other in the package + /** - * Start the session and begin consuming messages. - * - * @throws JMSException No listener has been specified. + * Runs in an own thread, basically calls the session.run(), it is up to the + * session to have been filled with messages and it will run against the + * listener set in StdServerSessionPool. When it has send all its messages it + * returns. */ - public void start() throws JMSException { - log.debug("starting invokes on server session"); - - if (session != null) { - try { - serverSessionPool.getExecutor().execute(this); - } - catch (InterruptedException ignore) {} - } - else { - throw new JMSException("No listener has been specified"); - } + public void run() + { + if( log.isTraceEnabled() ) + log.trace("running..."); + if (xaSession != null) + xaSession.run(); + else + session.run(); } - - //--- Protected parts, used by other in the package - /** - * Runs in an own thread, basically calls the session.run(), it is up - * to the session to have been filled with messages and it will run - * against the listener set in StdServerSessionPool. When it has send - * all its messages it returns. + * Will get called from session for each message stuffed into it. * - * HC: run() also starts a transaction with the TransactionManager and - * enlists the XAResource of the JMS XASession if a XASession was - * available. A good JMS implementation should provide the XASession - * for use in the ASF. So we optimize for the case where we have an - * XASession. So, for the case where we do not have an XASession and - * the bean is not transacted, we have the unneeded overhead of creating - * a Transaction. I'm leaving it this way since it keeps the code simpler - * and that case should not be too common (JBossMQ provides XASessions). + * Starts a transaction with the TransactionManager + * and enlists the XAResource of the JMS XASession if a XASession was + * available. A good JMS implementation should provide the XASession for use + * in the ASF. So we optimize for the case where we have an XASession. So, + * for the case where we do not have an XASession and the bean is not + * transacted, we have the unneeded overhead of creating a Transaction. I'm + * leaving it this way since it keeps the code simpler and that case should + * not be too common (JBossMQ provides XASessions). */ - public void run() { - log.debug("running..."); - + public void onMessage(Message msg) + { + boolean trace = log.isTraceEnabled(); + if( trace ) + log.trace("onMessage running (pool, session, xaSession, useLocalTX): " + + ", " + session + ", " + xaSession + ", " + useLocalTX); + + // Used if run with useLocalTX if true + Xid localXid = null; + boolean localRollbackFlag=false; + // Used if run with useLocalTX if false Transaction trans = null; - try { - tm.begin(); - trans = tm.getTransaction(); - - if (xaSession != null) { + try + { + + if (useLocalTX) + { + // Use JBossMQ One Phase Commit to commit the TX + localXid = new XidImpl(); XAResource res = xaSession.getXAResource(); - trans.enlistResource(res); - if (log.isDebugEnabled()) { - log.debug("XAResource '"+res+"' enlisted."); + res.start(localXid, XAResource.TMNOFLAGS); + + if( trace ) + log.trace("Using optimized 1p commit to control TX."); + } + else + { + + // Use the TM to control the TX + tm.begin(); + trans = tm.getTransaction(); + + if (xaSession != null) + { + XAResource res = xaSession.getXAResource(); + trans.enlistResource(res); + if( trace ) + log.trace("XAResource '" + res + "' enlisted."); } - } - + } + //currentTransactionId = connection.spyXAResourceManager.startTx(); + // run the session - session.run(); + //session.run(); + // Call delegate listener + delegateListener.onMessage(msg); } - catch (Exception e) { + catch (Exception e) + { log.error("session failed to run; setting rollback only", e); - - try { - // The transaction will be rolledback in the finally - trans.setRollbackOnly(); + + if (useLocalTX) + { + // Use JBossMQ One Phase Commit to commit the TX + localRollbackFlag = true; } - catch (Exception x) { - log.error("failed to set rollback only", x); + else + { + // Mark for tollback TX via TM + try + { + // The transaction will be rolledback in the finally + if( trace ) + log.trace("Using TM to mark TX for rollback."); + trans.setRollbackOnly(); + } + catch (Exception x) + { + log.error("failed to set rollback only", x); + } } + } - finally { - try { - // Marked rollback - if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) { - log.info("Rolling back JMS transaction"); - // actually roll it back - trans.rollback(); - - // NO XASession? then manually rollback. - // This is not so good but - // it's the best we can do if we have no XASession. - if (xaSession == null && serverSessionPool.isTransacted()) { - session.rollback(); + finally + { + try + { + if (useLocalTX) + { + if( localRollbackFlag == true ) + { + if( trace ) + log.trace("Using optimized 1p commit to rollback TX."); + + XAResource res = xaSession.getXAResource(); + res.end(localXid, XAResource.TMSUCCESS); + res.rollback(localXid); + } - } else if (trans.getStatus() == Status.STATUS_ACTIVE) { - // Commit tx - // This will happen if - // a) everything goes well - // b) app. exception was thrown - trans.commit(); - - // NO XASession? then manually commit. This is not so good but - // it's the best we can do if we have no XASession. - if (xaSession == null && serverSessionPool.isTransacted()) { - session.commit(); + else + { + if( trace ) + log.trace("Using optimized 1p commit to commit TX."); + + XAResource res = xaSession.getXAResource(); + res.end(localXid, XAResource.TMSUCCESS); + res.commit(localXid, true); } } + else + { + // Use the TM to commit the Tx + // Marked rollback + if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) + { + if( trace ) + log.trace("Rolling back JMS transaction"); + // actually roll it back + trans.rollback(); + + // NO XASession? then manually rollback. + // This is not so good but + // it's the best we can do if we have no XASession. + if (xaSession == null && serverSessionPool.isTransacted()) + { + session.rollback(); + } + } + else if (trans.getStatus() == Status.STATUS_ACTIVE) + { + // Commit tx + // This will happen if + // a) everything goes well + // b) app. exception was thrown + if( trace ) + log.trace("Commiting the JMS transaction"); + trans.commit(); + + // NO XASession? then manually commit. This is not so good but + // it's the best we can do if we have no XASession. + if (xaSession == null && serverSessionPool.isTransacted()) + { + session.commit(); + } + } + } } - catch (Exception e) { + catch (Exception e) + { log.error("failed to commit/rollback", e); } - + StdServerSession.this.recycle(); } - - log.debug("done"); + if( trace ) + log.trace("onMessage done"); } - + /** - * This method is called by the ServerSessionPool when it is ready to - * be recycled intot the pool + * Start the session and begin consuming messages. + * + * @throws JMSException No listener has been specified. */ - void recycle() + public void start() throws JMSException { - serverSessionPool.recycle(this); + if( log.isTraceEnabled() ) + log.trace("starting invokes on server session"); + + if (session != null) + { + try + { + serverSessionPool.getExecutor().execute(this); + } + catch (InterruptedException ignore) + { + } + } + else + { + throw new JMSException("No listener has been specified"); + } } - + /** * Called by the ServerSessionPool when the sessions should be closed. */ - void close() { - if (session != null) { - try { + void close() + { + if (session != null) + { + try + { session.close(); - } catch (Exception ignore) {} + } + catch (Exception ignore) + { + } session = null; } - if (xaSession != null) { - try { + if (xaSession != null) + { + try + { xaSession.close(); - } catch (Exception ignore) {} + } + catch (Exception ignore) + { + } xaSession = null; } - + log.debug("closed"); } + + /** + * This method is called by the ServerSessionPool when it is ready to be + * recycled intot the pool + */ + void recycle() + { + serverSessionPool.recycle(this); + } + } 1.4.6.3 +208 -151 jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java Index: StdServerSessionPool.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v retrieving revision 1.4.6.2 retrieving revision 1.4.6.3 diff -u -r1.4.6.2 -r1.4.6.3 --- StdServerSessionPool.java 2001/08/23 03:58:15 1.4.6.2 +++ StdServerSessionPool.java 2001/12/14 03:22:11 1.4.6.3 @@ -1,128 +1,132 @@ /* - * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]> + * JBoss, the OpenSource J2EE webOS * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * Distributable under LGPL license. + * See terms of license at gnu.org. */ package org.jboss.jms.asf; -import java.util.List; +import EDU.oswego.cs.dl.util.concurrent.Executor; +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; +import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; import java.util.ArrayList; import java.util.Iterator; +import java.util.List; + import javax.jms.Connection; import javax.jms.JMSException; +import javax.jms.MessageListener; +import javax.jms.QueueConnection; import javax.jms.ServerSession; import javax.jms.ServerSessionPool; -import javax.jms.MessageListener; +import javax.jms.Session; import javax.jms.TopicConnection; -import javax.jms.XATopicConnection; -import javax.jms.QueueConnection; import javax.jms.XAQueueConnection; -import javax.jms.Session; -import javax.jms.XASession; import javax.jms.XAQueueSession; +import javax.jms.XASession; +import javax.jms.XATopicConnection; import javax.jms.XATopicSession; - -import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; -import EDU.oswego.cs.dl.util.concurrent.Executor; -import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; -import org.apache.log4j.Category; +import org.jboss.logging.Logger; /** - * Implementation of ServerSessionPool. + * Implementation of ServerSessionPool. <p> * - * <p>Created: Thu Dec 7 17:02:03 2000 + * Created: Thu Dec 7 17:02:03 2000 * - * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>. - * @version $Revision: 1.4.6.2 $ + * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> . + * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> . + * @version $Revision: 1.4.6.3 $ */ public class StdServerSessionPool - implements ServerSessionPool + implements ServerSessionPool { - /** The default size of the pool. */ - private static final int DEFAULT_POOL_SIZE = 15; + /** + * The default size of the pool. + */ + private final static int DEFAULT_POOL_SIZE = 15; - /** The thread group which session workers will run. */ + /** + * The thread group which session workers will run. + */ private static ThreadGroup threadGroup = - new ThreadGroup("ASF Session Pool Threads"); + new ThreadGroup("ASF Session Pool Threads"); - /** Instance logger. */ - private final Category log = Category.getInstance(this.getClass()); + /** + * Instance logger. + */ + private final Logger log = Logger.getLogger(this.getClass()); - /** The size of the pool. */ + /** + * The size of the pool. + */ private int poolSize; - /** The message acknowledgment mode. */ + /** + * The message acknowledgment mode. + */ private int ack; - /** True if this is a transacted session. */ + /** + * Is the bean container managed? + */ + private boolean useLocalTX; + + /** + * True if this is a transacted session. + */ private boolean transacted; - /** The session connection. */ + /** + * The session connection. + */ private Connection con; - /** The message listener for the session. */ + /** + * The message listener for the session. + */ private MessageListener listener; - /** The list of ServerSessions. */ + /** + * The list of ServerSessions. + */ private List sessionPool; - /** The executor for processing messages? */ + /** + * The executor for processing messages? + */ private PooledExecutor executor; - /** Used to signal when the Pool is being closed down */ + /** + * Used to signal when the Pool is being closed down + */ private boolean closing = false; - /** Used during close down to wait for all server sessions to be returned and closed.*/ - private int numServerSessions = 0; - /** - * Construct a <tt>StdServerSessionPool</tt> using the default - * pool size. - * - * @param con - * @param transacted - * @param ack - * @param listener + * Used during close down to wait for all server sessions to be returned and + * closed. */ - public StdServerSessionPool(final Connection con, - final boolean transacted, - final int ack, - final MessageListener listener) - throws JMSException - { - this(con, transacted, ack, listener, DEFAULT_POOL_SIZE); - } + private int numServerSessions = 0; /** - * Construct a <tt>StdServerSessionPool</tt> using the default - * pool size. + * Construct a <tt>StdServerSessionPool</tt> using the default pool size. * - * @param con - * @param transacted - * @param ack - * @param listener - * @param maxSession + * @param con connection to get sessions from + * @param transacted transaction mode when not XA ( + * @param ack ackmode when not XA + * @param listener the listener the sessions will call + * @param maxSession maximum number of sessions in the pool + * @param isuseLocalTX Description of Parameter + * @exception JMSException Description of Exception */ public StdServerSessionPool(final Connection con, - final boolean transacted, - final int ack, - final MessageListener listener, - final int maxSession) - throws JMSException + final boolean transacted, + final int ack, + final boolean useLocalTX, + final MessageListener listener, + final int maxSession) + throws JMSException { this.con = con; this.ack = ack; @@ -130,19 +134,29 @@ this.transacted = transacted; this.poolSize = maxSession; this.sessionPool = new ArrayList(maxSession); + this.useLocalTX = useLocalTX; // setup the worker pool executor = new PooledExecutor(poolSize); executor.setMinimumPoolSize(0); - executor.setKeepAliveTime(1000*30); + executor.setKeepAliveTime(1000 * 30); executor.waitWhenBlocked(); - executor.setThreadFactory(new ThreadFactory() { + executor.setThreadFactory( + new ThreadFactory() + { private volatile int count = 0; - - public Thread newThread(final Runnable command) { + + /** + * #Description of the Method + * + * @param command Description of Parameter + * @return Description of the Returned Value + */ + public Thread newThread(final Runnable command) + { return new Thread(threadGroup, - command, - "Thread Pool Worker-" + count++); + command, + "Thread Pool Worker-" + count++); } }); @@ -155,98 +169,78 @@ /** * Get a server session. - * - * @return A server session. * - * @throws JMSException Failed to get a server session. + * @return A server session. + * @throws JMSException Failed to get a server session. */ public ServerSession getServerSession() throws JMSException { - log.debug("getting a server session"); + if( log.isTraceEnabled() ) + log.trace("getting a server session"); ServerSession session = null; - try { - while (true) { - synchronized (sessionPool) { - if(closing){ + try + { + while (true) + { + synchronized (sessionPool) + { + if (closing) + { throw new JMSException("Cannot get session after pool has been closed down."); } - else if (sessionPool.size() > 0) { + else if (sessionPool.size() > 0) + { session = (ServerSession)sessionPool.remove(0); break; } - else { - try { + else + { + try + { sessionPool.wait(); + } + catch (InterruptedException ignore) + { } - catch (InterruptedException ignore) {} } } } } - catch (Exception e) { + catch (Exception e) + { throw new JMSException("Failed to get a server session: " + e); } // assert session != null - - log.debug("using server session: " + session); + if( log.isTraceEnabled() ) + log.trace("using server session: " + session); return session; } - // --- Protected messages for StdServerSession to use - - /** - * Returns true if this server session is transacted. - */ - boolean isTransacted() { - return transacted; - } - - /** - * Recycle a server session. - */ - void recycle(StdServerSession session) { - synchronized (sessionPool) { - if(closing){ - session.close(); - numServerSessions--; - if(numServerSessions == 0) //notify clear thread. - sessionPool.notifyAll(); - }else{ - sessionPool.add(session); - sessionPool.notifyAll(); - log.debug("recycled server session: " + session); - } - } - } - - /** - * Get the executor we are using. - */ - Executor getExecutor() { - return executor; - } - /** * Clear the pool, clear out both threads and ServerSessions, * connection.stop() should be run before this method. */ - public void clear() { - synchronized (sessionPool) { + public void clear() + { + synchronized (sessionPool) + { // FIXME - is there a runaway condition here. What if a // ServerSession are taken by a ConnecionConsumer? Should we set // a flag somehow so that no ServerSessions are recycled and the // ThreadPool won't leave any more threads out. closing = true; - if (log.isDebugEnabled()) { + if (log.isDebugEnabled()) + { log.debug("Clearing " + sessionPool.size() + - " from ServerSessionPool"); + " from ServerSessionPool"); } Iterator iter = sessionPool.iterator(); - while (iter.hasNext()) { + while (iter.hasNext()) + { StdServerSession ses = (StdServerSession)iter.next(); // Should we do anything to the server session? ses.close(); @@ -261,9 +255,69 @@ executor.shutdownAfterProcessingCurrentlyQueuedTasks(); //wait for all server sessions to be returned. - synchronized(sessionPool){ - while(numServerSessions > 0) - try{ sessionPool.wait(); }catch(InterruptedException ignore){} + synchronized (sessionPool) + { + while (numServerSessions > 0) + { + try + { + sessionPool.wait(); + } + catch (InterruptedException ignore) + { + } + } + } + } + + /** + * Get the executor we are using. + * + * @return The Executor value + */ + Executor getExecutor() + { + return executor; + } + + // --- Protected messages for StdServerSession to use + + /** + * Returns true if this server session is transacted. + * + * @return The Transacted value + */ + boolean isTransacted() + { + return transacted; + } + + /** + * Recycle a server session. + * + * @param session Description of Parameter + */ + void recycle(StdServerSession session) + { + synchronized (sessionPool) + { + if (closing) + { + session.close(); + numServerSessions--; + if (numServerSessions == 0) + { + //notify clear thread. + sessionPool.notifyAll(); + } + } + else + { + sessionPool.add(session); + sessionPool.notifyAll(); + if( log.isTraceEnabled() ) + log.trace("recycled server session: " + session); + } } } @@ -271,44 +325,47 @@ private void init() throws JMSException { - for (int index = 0; index < poolSize; index++) { + for (int index = 0; index < poolSize; index++) + { // Here is the meat, that MUST follow the spec Session ses = null; XASession xaSes = null; log.debug("initializing with connection: " + con); - if (con instanceof XATopicConnection) { + if (con instanceof XATopicConnection) + { xaSes = ((XATopicConnection)con).createXATopicSession(); ses = ((XATopicSession)xaSes).getTopicSession(); } - else if (con instanceof XAQueueConnection) { + else if (con instanceof XAQueueConnection) + { xaSes = ((XAQueueConnection)con).createXAQueueSession(); ses = ((XAQueueSession)xaSes).getQueueSession(); } - else if (con instanceof TopicConnection) { + else if (con instanceof TopicConnection) + { ses = ((TopicConnection)con).createTopicSession(transacted, ack); log.warn("Using a non-XA TopicConnection. " + - "It will not be able to participate in a Global UOW"); + "It will not be able to participate in a Global UOW"); } - else if (con instanceof QueueConnection) { + else if (con instanceof QueueConnection) + { ses = ((QueueConnection)con).createQueueSession(transacted, ack); log.warn("Using a non-XA QueueConnection. " + - "It will not be able to participate in a Global UOW"); + "It will not be able to participate in a Global UOW"); } - else { + else + { // should never happen really log.error("Connection was not reconizable: " + con); throw new JMSException("Connection was not reconizable: " + con); } - // This might not be totaly spec compliant since it says that app - // server should create as many message listeners its needs. - log.debug("setting session listener: " + listener); - ses.setMessageListener(listener); + // create the server session and add it to the pool - it is up to the + // server session to set the listener + StdServerSession serverSession = new StdServerSession(this, ses, xaSes, listener, useLocalTX); - // create the server session and add it to the pool - ServerSession serverSession = new StdServerSession(this, ses, xaSes); sessionPool.add(serverSession); numServerSessions++; log.debug("added server session to the pool: " + serverSession); 1.3.6.3 +32 -45 jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java Index: StdServerSessionPoolFactory.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java,v retrieving revision 1.3.6.2 retrieving revision 1.3.6.3 diff -u -r1.3.6.2 -r1.3.6.3 --- StdServerSessionPoolFactory.java 2001/08/23 03:58:15 1.3.6.2 +++ StdServerSessionPoolFactory.java 2001/12/14 03:22:11 1.3.6.3 @@ -1,93 +1,80 @@ /* - * Copyright (c) 2000 Peter Antman DN <[EMAIL PROTECTED]> + * JBoss, the OpenSource J2EE webOS * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * Distributable under LGPL license. + * See terms of license at gnu.org. */ package org.jboss.jms.asf; import java.io.Serializable; - -import javax.jms.ServerSessionPool; -import javax.jms.MessageListener; import javax.jms.Connection; import javax.jms.JMSException; +import javax.jms.MessageListener; +import javax.jms.ServerSessionPool; + /** - * An implementation of ServerSessionPoolFactory. + * An implementation of ServerSessionPoolFactory. <p> * - * <p>Created: Fri Dec 22 09:47:41 2000 + * Created: Fri Dec 22 09:47:41 2000 * - * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>. - * @version $Revision: 1.3.6.2 $ + * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> . + * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> . + * @version $Revision: 1.3.6.3 $ */ public class StdServerSessionPoolFactory - implements ServerSessionPoolFactory, Serializable + implements ServerSessionPoolFactory, Serializable { - /** The name of this factory. */ + /** + * The name of this factory. + */ private String name; /** - * Construct a <tt>StdServerSessionPoolFactory</tt>. + * Construct a <tt>StdServerSessionPoolFactory</tt> . */ - public StdServerSessionPoolFactory() { + public StdServerSessionPoolFactory() + { super(); } /** * Set the name of the factory. * - * @param name The name of the factory. + * @param name The name of the factory. */ - public void setName(final String name) { + public void setName(final String name) + { this.name = name; } - + /** * Get the name of the factory. * - * @return The name of the factory. + * @return The name of the factory. */ - public String getName() { + public String getName() + { return name; } /** - * Create a new <tt>ServerSessionPool</tt>. + * Create a new <tt>ServerSessionPool</tt> . * * @param con * @param maxSession * @param isTransacted * @param ack * @param listener - * @return A new pool. - * + * @param isContainerManaged Description of Parameter + * @return A new pool. * @throws JMSException + * @exception javax.jms.JMSException Description of Exception */ - public ServerSessionPool getServerSessionPool(final Connection con, - final int maxSession, - final boolean isTransacted, - final int ack, - final MessageListener listener) - throws JMSException + public ServerSessionPool getServerSessionPool(Connection con, int maxSession, + boolean isTransacted, int ack, boolean useLocalTX, MessageListener listener) throws JMSException { - ServerSessionPool pool = (ServerSessionPool) - new StdServerSessionPool(con, - isTransacted, - ack, - listener, - maxSession); + ServerSessionPool pool = new StdServerSessionPool(con, isTransacted, ack, useLocalTX, listener, maxSession); return pool; } }
_______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] https://lists.sourceforge.net/lists/listinfo/jboss-development