User: user57
Date: 01/07/21 13:27:13
Modified: src/main/org/jboss/jms/asf StdServerSession.java
StdServerSessionPool.java
Log:
o re-indent and javadoc'd most of StdServerSessionPool
o changed sessionPool from a Vector to an ArrayList (since it was always
in a synchronized block)
Revision Changes Path
1.7 +4 -1 jboss/src/main/org/jboss/jms/asf/StdServerSession.java
Index: StdServerSession.java
===================================================================
RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- StdServerSession.java 2001/07/21 04:18:28 1.6
+++ StdServerSession.java 2001/07/21 20:27:13 1.7
@@ -40,7 +40,7 @@
*
* @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
* @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
- * @version $Revision: 1.6 $
+ * @version $Revision: 1.7 $
*/
public class StdServerSession
implements Runnable, ServerSession
@@ -74,6 +74,9 @@
final XASession xaSession)
throws JMSException
{
+ // assert pool != null
+ // assert session != null
+
this.serverSessionPool = pool;
this.session = session;
this.xaSession = xaSession;
1.10 +150 -100 jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Index: StdServerSessionPool.java
===================================================================
RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- StdServerSessionPool.java 2001/07/21 04:18:28 1.9
+++ StdServerSessionPool.java 2001/07/21 20:27:13 1.10
@@ -17,8 +17,9 @@
*/
package org.jboss.jms.asf;
-import java.util.Vector;
-import java.util.Enumeration;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -37,7 +38,6 @@
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
-import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import org.apache.log4j.Category;
@@ -47,108 +47,163 @@
* <p>Created: Thu Dec 7 17:02:03 2000
*
* @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @version $Revision: 1.9 $
+ * @version $Revision: 1.10 $
*/
public class StdServerSessionPool
implements ServerSessionPool
{
+ /** The default size of the pool. */
+ private static final int DEFAULT_POOL_SIZE = 15;
+
+ /** The thread group which session workers will run. */
private static ThreadGroup threadGroup =
new ThreadGroup("ASF Session Pool Threads");
- private static final int DEFAULT_POOL_SIZE = 15;
/** Instance logger. */
private final Category log = Category.getInstance(this.getClass());
-
- private int poolSize = DEFAULT_POOL_SIZE;
+
+ /** The size of the pool. */
+ private int poolSize;
+
+ /** The message acknowledgment mode. */
private int ack;
+
+ /** True if this is a transacted session. */
private boolean transacted;
- private MessageListener listener;
+
+ /** The session connection. */
private Connection con;
- private Vector sessionPool = new Vector();
- private PooledExecutor executor;
- boolean isTransacted() {
- return transacted;
- }
-
+ /** The message listener for the session. */
+ private MessageListener listener;
+
+ /** The list of ServerSessions. */
+ private List sessionPool;
+
+ /** The executor for processing messages? */
+ private PooledExecutor executor;
+
/**
- * Minimal constructor, could also have stuff for pool size
+ * Construct a <tt>StdServerSessionPool</tt> using the default
+ * pool size.
+ *
+ * @param con
+ * @param transacted
+ * @param ack
+ * @param listener
*/
- public StdServerSessionPool(Connection con,
- boolean transacted,
- int ack,
- MessageListener listener)
+ public StdServerSessionPool(final Connection con,
+ final boolean transacted,
+ final int ack,
+ final MessageListener listener)
throws JMSException
{
- this(con,transacted,ack,listener,DEFAULT_POOL_SIZE);
+ this(con, transacted, ack, listener, DEFAULT_POOL_SIZE);
}
- public StdServerSessionPool(Connection con,
- boolean transacted,
- int ack,
- MessageListener listener,
- int maxSession)
+ /**
+ * Construct a <tt>StdServerSessionPool</tt> using the default
+ * pool size.
+ *
+ * @param con
+ * @param transacted
+ * @param ack
+ * @param listener
+ * @param maxSession
+ */
+ public StdServerSessionPool(final Connection con,
+ final boolean transacted,
+ final int ack,
+ final MessageListener listener,
+ final int maxSession)
throws JMSException
{
- this.con= con;
- this.ack= ack;
- this.listener= listener;
- this.transacted= transacted;
- this.poolSize= maxSession;
-
+ this.con = con;
+ this.ack = ack;
+ this.listener = listener;
+ this.transacted = transacted;
+ this.poolSize = maxSession;
+ this.sessionPool = new ArrayList(maxSession);
+
+ // setup the worker pool
executor = new PooledExecutor(poolSize);
executor.setMinimumPoolSize(0);
executor.setKeepAliveTime(1000*30);
executor.waitWhenBlocked();
executor.setThreadFactory(new ThreadFactory() {
- public Thread newThread(Runnable command) {
+ public Thread newThread(final Runnable command) {
return new Thread(threadGroup, command, "Thread Pool Worker");
}
});
-
+
+ // finish initializing the session
init();
log.debug("Server Session pool set up");
}
// --- JMS API for ServerSessionPool
- public ServerSession getServerSession()
- throws JMSException
+ /**
+ * Get a server session.
+ *
+ * @return A server session.
+ *
+ * @throws JMSException Failed to get a server session.
+ */
+ public ServerSession getServerSession() throws JMSException
{
- ServerSession result = null;
- log.debug("Leaving out a server session");
+ log.debug("getting a server session");
+ ServerSession session = null;
+
try {
- for (;;) {
- if (sessionPool.size() > 0) {
- result = (ServerSession)sessionPool.remove(0);
- break;
- }
- else {
- try {
- synchronized (sessionPool) {
+ while (true) {
+ synchronized (sessionPool) {
+ if (sessionPool.size() > 0) {
+ session = (ServerSession)sessionPool.remove(0);
+ break;
+ }
+ else {
+ try {
sessionPool.wait();
}
+ catch (InterruptedException ignore) {}
}
- catch (InterruptedException ignore) {}
- }
+ }
}
}
catch (Exception e) {
- throw new JMSException("Error in getServerSession: " + e);
+ throw new JMSException("Failed to get a server session: " + e);
}
+
+ // assert session != null
- return result;
+ log.debug("using server session: " + session);
+ return session;
}
// --- Protected messages for StdServerSession to use
+ /**
+ * Returns true if this server session is transacted.
+ */
+ boolean isTransacted() {
+ return transacted;
+ }
+
+ /**
+ * Recycle a server session.
+ */
void recycle(StdServerSession session) {
synchronized (sessionPool) {
- sessionPool.addElement(session);
+ sessionPool.add(session);
sessionPool.notifyAll();
+ log.debug("recycled server session: " + session);
}
}
+ /**
+ * Get the executor we are using.
+ */
Executor getExecutor() {
return executor;
}
@@ -161,18 +216,18 @@
synchronized (sessionPool) {
// FIXME - is there a runaway condition here. What if a
// ServerSession are taken by a ConnecionConsumer? Should we set
- // a flag somehow so that no
- // ServerSessions are recycled and the ThreadPool don't leve any
- // more threads out.
+ // a flag somehow so that no ServerSessions are recycled and the
+ // ThreadPool won't leave any more threads out.
+
if (log.isDebugEnabled()) {
log.debug("Clearing " + sessionPool.size() +
" from ServerSessionPool");
}
- Enumeration e = sessionPool.elements();
- while (e.hasMoreElements()) {
- StdServerSession ses = (StdServerSession)e.nextElement();
- // Should we do any thing to the server session?
+ Iterator iter = sessionPool.iterator();
+ while (iter.hasNext()) {
+ StdServerSession ses = (StdServerSession)iter.next();
+ // Should we do anything to the server session?
ses.close();
}
@@ -187,50 +242,45 @@
private void init() throws JMSException
{
for (int index = 0; index < poolSize; index++) {
- try {
- // Here is the meat, that MUST follow the spec
- Session ses = null;
- XASession xaSes = null;
-
- log.debug("initializing with connection: " + con);
-
- if (con instanceof XATopicConnection) {
- xaSes = ((XATopicConnection)con).createXATopicSession();
- ses = ((XATopicSession)xaSes).getTopicSession();
- }
- else if (con instanceof XAQueueConnection) {
- xaSes = ((XAQueueConnection)con).createXAQueueSession();
- ses = ((XAQueueSession)xaSes).getQueueSession();
- }
- else if (con instanceof TopicConnection) {
- ses = ((TopicConnection)con).createTopicSession(transacted, ack);
- log.warn("Using a non-XA TopicConnection. " +
- "It will not be able to participate in a Global UOW");
- }
- else if (con instanceof QueueConnection) {
- ses = ((QueueConnection)con).createQueueSession(transacted, ack);
- log.warn("Using a non-XA QueueConnection. " +
- "It will not be able to participate in a Global UOW");
- }
- else {
- log.debug("Error in getting session for con: " + con);
- throw new JMSException("Connection was not reconizable: " + con);
- }
-
- // This might not be totala spec compliant since it
- // says that app server should create as many
- // message listeners its needs,
- log.debug("Setting listener for session");
- ses.setMessageListener(listener);
- sessionPool.addElement(new StdServerSession(this, ses, xaSes));
- }
- catch (JMSException exception) {
- if (log.isDebugEnabled()) {
- log.debug("Error in adding to pool: " + exception
- + " Pool: " + this
- + " listener: " + listener);
- }
+ // Here is the meat, that MUST follow the spec
+ Session ses = null;
+ XASession xaSes = null;
+
+ log.debug("initializing with connection: " + con);
+
+ if (con instanceof XATopicConnection) {
+ xaSes = ((XATopicConnection)con).createXATopicSession();
+ ses = ((XATopicSession)xaSes).getTopicSession();
+ }
+ else if (con instanceof XAQueueConnection) {
+ xaSes = ((XAQueueConnection)con).createXAQueueSession();
+ ses = ((XAQueueSession)xaSes).getQueueSession();
+ }
+ else if (con instanceof TopicConnection) {
+ ses = ((TopicConnection)con).createTopicSession(transacted, ack);
+ log.warn("Using a non-XA TopicConnection. " +
+ "It will not be able to participate in a Global UOW");
}
+ else if (con instanceof QueueConnection) {
+ ses = ((QueueConnection)con).createQueueSession(transacted, ack);
+ log.warn("Using a non-XA QueueConnection. " +
+ "It will not be able to participate in a Global UOW");
+ }
+ else {
+ // should never happen really
+ log.error("Connection was not reconizable: " + con);
+ throw new JMSException("Connection was not reconizable: " + con);
+ }
+
+ // This might not be totaly spec compliant since it says that app
+ // server should create as many message listeners its needs.
+ log.debug("setting session listener: " + listener);
+ ses.setMessageListener(listener);
+
+ // create the server session and add it to the pool
+ ServerSession serverSession = new StdServerSession(this, ses, xaSes);
+ sessionPool.add(serverSession);
+ log.debug("added server session to the pool: " + serverSession);
}
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
http://lists.sourceforge.net/lists/listinfo/jboss-development