User: user57 Date: 01/07/21 13:27:13 Modified: src/main/org/jboss/jms/asf StdServerSession.java StdServerSessionPool.java Log: o re-indent and javadoc'd most of StdServerSessionPool o changed sessionPool from a Vector to an ArrayList (since it was always in a synchronized block) Revision Changes Path 1.7 +4 -1 jboss/src/main/org/jboss/jms/asf/StdServerSession.java Index: StdServerSession.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v retrieving revision 1.6 retrieving revision 1.7 diff -u -r1.6 -r1.7 --- StdServerSession.java 2001/07/21 04:18:28 1.6 +++ StdServerSession.java 2001/07/21 20:27:13 1.7 @@ -40,7 +40,7 @@ * * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>. * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a> - * @version $Revision: 1.6 $ + * @version $Revision: 1.7 $ */ public class StdServerSession implements Runnable, ServerSession @@ -74,6 +74,9 @@ final XASession xaSession) throws JMSException { + // assert pool != null + // assert session != null + this.serverSessionPool = pool; this.session = session; this.xaSession = xaSession; 1.10 +150 -100 jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java Index: StdServerSessionPool.java =================================================================== RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- StdServerSessionPool.java 2001/07/21 04:18:28 1.9 +++ StdServerSessionPool.java 2001/07/21 20:27:13 1.10 @@ -17,8 +17,9 @@ */ package org.jboss.jms.asf; -import java.util.Vector; -import java.util.Enumeration; +import java.util.List; +import java.util.ArrayList; +import java.util.Iterator; import javax.jms.Connection; import javax.jms.JMSException; @@ -37,7 +38,6 @@ import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; import EDU.oswego.cs.dl.util.concurrent.Executor; import EDU.oswego.cs.dl.util.concurrent.ThreadFactory; -import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer; import org.apache.log4j.Category; @@ -47,108 +47,163 @@ * <p>Created: Thu Dec 7 17:02:03 2000 * * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>. - * @version $Revision: 1.9 $ + * @version $Revision: 1.10 $ */ public class StdServerSessionPool implements ServerSessionPool { + /** The default size of the pool. */ + private static final int DEFAULT_POOL_SIZE = 15; + + /** The thread group which session workers will run. */ private static ThreadGroup threadGroup = new ThreadGroup("ASF Session Pool Threads"); - private static final int DEFAULT_POOL_SIZE = 15; /** Instance logger. */ private final Category log = Category.getInstance(this.getClass()); - - private int poolSize = DEFAULT_POOL_SIZE; + + /** The size of the pool. */ + private int poolSize; + + /** The message acknowledgment mode. */ private int ack; + + /** True if this is a transacted session. */ private boolean transacted; - private MessageListener listener; + + /** The session connection. */ private Connection con; - private Vector sessionPool = new Vector(); - private PooledExecutor executor; - boolean isTransacted() { - return transacted; - } - + /** The message listener for the session. */ + private MessageListener listener; + + /** The list of ServerSessions. */ + private List sessionPool; + + /** The executor for processing messages? */ + private PooledExecutor executor; + /** - * Minimal constructor, could also have stuff for pool size + * Construct a <tt>StdServerSessionPool</tt> using the default + * pool size. + * + * @param con + * @param transacted + * @param ack + * @param listener */ - public StdServerSessionPool(Connection con, - boolean transacted, - int ack, - MessageListener listener) + public StdServerSessionPool(final Connection con, + final boolean transacted, + final int ack, + final MessageListener listener) throws JMSException { - this(con,transacted,ack,listener,DEFAULT_POOL_SIZE); + this(con, transacted, ack, listener, DEFAULT_POOL_SIZE); } - public StdServerSessionPool(Connection con, - boolean transacted, - int ack, - MessageListener listener, - int maxSession) + /** + * Construct a <tt>StdServerSessionPool</tt> using the default + * pool size. + * + * @param con + * @param transacted + * @param ack + * @param listener + * @param maxSession + */ + public StdServerSessionPool(final Connection con, + final boolean transacted, + final int ack, + final MessageListener listener, + final int maxSession) throws JMSException { - this.con= con; - this.ack= ack; - this.listener= listener; - this.transacted= transacted; - this.poolSize= maxSession; - + this.con = con; + this.ack = ack; + this.listener = listener; + this.transacted = transacted; + this.poolSize = maxSession; + this.sessionPool = new ArrayList(maxSession); + + // setup the worker pool executor = new PooledExecutor(poolSize); executor.setMinimumPoolSize(0); executor.setKeepAliveTime(1000*30); executor.waitWhenBlocked(); executor.setThreadFactory(new ThreadFactory() { - public Thread newThread(Runnable command) { + public Thread newThread(final Runnable command) { return new Thread(threadGroup, command, "Thread Pool Worker"); } }); - + + // finish initializing the session init(); log.debug("Server Session pool set up"); } // --- JMS API for ServerSessionPool - public ServerSession getServerSession() - throws JMSException + /** + * Get a server session. + * + * @return A server session. + * + * @throws JMSException Failed to get a server session. + */ + public ServerSession getServerSession() throws JMSException { - ServerSession result = null; - log.debug("Leaving out a server session"); + log.debug("getting a server session"); + ServerSession session = null; + try { - for (;;) { - if (sessionPool.size() > 0) { - result = (ServerSession)sessionPool.remove(0); - break; - } - else { - try { - synchronized (sessionPool) { + while (true) { + synchronized (sessionPool) { + if (sessionPool.size() > 0) { + session = (ServerSession)sessionPool.remove(0); + break; + } + else { + try { sessionPool.wait(); } + catch (InterruptedException ignore) {} } - catch (InterruptedException ignore) {} - } + } } } catch (Exception e) { - throw new JMSException("Error in getServerSession: " + e); + throw new JMSException("Failed to get a server session: " + e); } + + // assert session != null - return result; + log.debug("using server session: " + session); + return session; } // --- Protected messages for StdServerSession to use + /** + * Returns true if this server session is transacted. + */ + boolean isTransacted() { + return transacted; + } + + /** + * Recycle a server session. + */ void recycle(StdServerSession session) { synchronized (sessionPool) { - sessionPool.addElement(session); + sessionPool.add(session); sessionPool.notifyAll(); + log.debug("recycled server session: " + session); } } + /** + * Get the executor we are using. + */ Executor getExecutor() { return executor; } @@ -161,18 +216,18 @@ synchronized (sessionPool) { // FIXME - is there a runaway condition here. What if a // ServerSession are taken by a ConnecionConsumer? Should we set - // a flag somehow so that no - // ServerSessions are recycled and the ThreadPool don't leve any - // more threads out. + // a flag somehow so that no ServerSessions are recycled and the + // ThreadPool won't leave any more threads out. + if (log.isDebugEnabled()) { log.debug("Clearing " + sessionPool.size() + " from ServerSessionPool"); } - Enumeration e = sessionPool.elements(); - while (e.hasMoreElements()) { - StdServerSession ses = (StdServerSession)e.nextElement(); - // Should we do any thing to the server session? + Iterator iter = sessionPool.iterator(); + while (iter.hasNext()) { + StdServerSession ses = (StdServerSession)iter.next(); + // Should we do anything to the server session? ses.close(); } @@ -187,50 +242,45 @@ private void init() throws JMSException { for (int index = 0; index < poolSize; index++) { - try { - // Here is the meat, that MUST follow the spec - Session ses = null; - XASession xaSes = null; - - log.debug("initializing with connection: " + con); - - if (con instanceof XATopicConnection) { - xaSes = ((XATopicConnection)con).createXATopicSession(); - ses = ((XATopicSession)xaSes).getTopicSession(); - } - else if (con instanceof XAQueueConnection) { - xaSes = ((XAQueueConnection)con).createXAQueueSession(); - ses = ((XAQueueSession)xaSes).getQueueSession(); - } - else if (con instanceof TopicConnection) { - ses = ((TopicConnection)con).createTopicSession(transacted, ack); - log.warn("Using a non-XA TopicConnection. " + - "It will not be able to participate in a Global UOW"); - } - else if (con instanceof QueueConnection) { - ses = ((QueueConnection)con).createQueueSession(transacted, ack); - log.warn("Using a non-XA QueueConnection. " + - "It will not be able to participate in a Global UOW"); - } - else { - log.debug("Error in getting session for con: " + con); - throw new JMSException("Connection was not reconizable: " + con); - } - - // This might not be totala spec compliant since it - // says that app server should create as many - // message listeners its needs, - log.debug("Setting listener for session"); - ses.setMessageListener(listener); - sessionPool.addElement(new StdServerSession(this, ses, xaSes)); - } - catch (JMSException exception) { - if (log.isDebugEnabled()) { - log.debug("Error in adding to pool: " + exception - + " Pool: " + this - + " listener: " + listener); - } + // Here is the meat, that MUST follow the spec + Session ses = null; + XASession xaSes = null; + + log.debug("initializing with connection: " + con); + + if (con instanceof XATopicConnection) { + xaSes = ((XATopicConnection)con).createXATopicSession(); + ses = ((XATopicSession)xaSes).getTopicSession(); + } + else if (con instanceof XAQueueConnection) { + xaSes = ((XAQueueConnection)con).createXAQueueSession(); + ses = ((XAQueueSession)xaSes).getQueueSession(); + } + else if (con instanceof TopicConnection) { + ses = ((TopicConnection)con).createTopicSession(transacted, ack); + log.warn("Using a non-XA TopicConnection. " + + "It will not be able to participate in a Global UOW"); } + else if (con instanceof QueueConnection) { + ses = ((QueueConnection)con).createQueueSession(transacted, ack); + log.warn("Using a non-XA QueueConnection. " + + "It will not be able to participate in a Global UOW"); + } + else { + // should never happen really + log.error("Connection was not reconizable: " + con); + throw new JMSException("Connection was not reconizable: " + con); + } + + // This might not be totaly spec compliant since it says that app + // server should create as many message listeners its needs. + log.debug("setting session listener: " + listener); + ses.setMessageListener(listener); + + // create the server session and add it to the pool + ServerSession serverSession = new StdServerSession(this, ses, xaSes); + sessionPool.add(serverSession); + log.debug("added server session to the pool: " + serverSession); } } } _______________________________________________ Jboss-development mailing list [EMAIL PROTECTED] http://lists.sourceforge.net/lists/listinfo/jboss-development