User: starksm
Date: 01/12/13 19:22:11
Modified: src/main/org/jboss/jms/asf Tag: Branch_2_4
ServerSessionPoolFactory.java
ServerSessionPoolLoader.java
ServerSessionPoolLoaderMBean.java
StdServerSession.java StdServerSessionPool.java
StdServerSessionPoolFactory.java
Log:
Integrate changes from 3.0 to improve the MDB/ASF layer. This includes
support for the dead message queue for repeated MDB.onMessage failures.
Revision Changes Path
No revision
No revision
1.1.6.3 +20 -29 jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java
Index: ServerSessionPoolFactory.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolFactory.java,v
retrieving revision 1.1.6.2
retrieving revision 1.1.6.3
diff -u -r1.1.6.2 -r1.1.6.3
--- ServerSessionPoolFactory.java 2001/08/23 03:58:15 1.1.6.2
+++ ServerSessionPoolFactory.java 2001/12/14 03:22:11 1.1.6.3
@@ -1,67 +1,58 @@
/*
- * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.ServerSessionPool;
-import javax.jms.JMSException;
/**
- * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects.
+ * Defines the model for creating <tt>ServerSessionPoolFactory</tt> objects. <p>
*
- * <p>Created: Wed Nov 29 15:55:21 2000
+ * Created: Wed Nov 29 15:55:21 2000
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @version $Revision: 1.1.6.2 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> .
+ * @version $Revision: 1.1.6.3 $
*/
public interface ServerSessionPoolFactory
{
/**
* Set the name of the factory.
*
- * @param name The name of the factory.
+ * @param name The name of the factory.
*/
void setName(String name);
/**
* Get the name of the factory.
*
- * @return The name of the factory.
+ * @return The name of the factory.
*/
String getName();
/**
- * Create a new <tt>ServerSessionPool</tt>.
+ * Create a new <tt>ServerSessionPool</tt> .
*
* @param con
* @param maxSession
* @param isTransacted
* @param ack
* @param listener
- * @return A new pool.
- *
+ * @param useLocalTX
+ * @return A new pool.
* @throws JMSException
*/
ServerSessionPool getServerSessionPool(Connection con,
- int maxSession,
- boolean isTransacted,
- int ack,
- MessageListener listener)
- throws JMSException;
+ int maxSession,
+ boolean isTransacted,
+ int ack,
+ boolean useLocalTX,
+ MessageListener listener)
+ throws JMSException;
}
1.2.6.3 +6 -22 jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java
Index: ServerSessionPoolLoader.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoader.java,v
retrieving revision 1.2.6.2
retrieving revision 1.2.6.3
diff -u -r1.2.6.2 -r1.2.6.3
--- ServerSessionPoolLoader.java 2001/08/23 03:58:15 1.2.6.2
+++ ServerSessionPoolLoader.java 2001/12/14 03:22:11 1.2.6.3
@@ -27,8 +27,6 @@
import javax.naming.NamingException;
import javax.naming.NameNotFoundException;
-import org.apache.log4j.Category;
-
import org.jboss.util.ServiceMBeanSupport;
/**
@@ -38,15 +36,12 @@
*
* @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
* @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
- * @version $Revision: 1.2.6.2 $
+ * @version $Revision: 1.2.6.3 $
*/
public class ServerSessionPoolLoader
extends ServiceMBeanSupport
implements ServerSessionPoolLoaderMBean
{
- /** Instance logger. */
- private final Category log = Category.getInstance(this.getClass());
-
/** The factory used to create server session pools. */
private ServerSessionPoolFactory poolFactory;
@@ -121,22 +116,6 @@
return name;
}
- /**
- * Initialize the service.
- *
- * <p>Setup the pool factory.
- *
- * @throws ClassNotFoundException Could not find pool factory class.
- * @throws Exception Failed to create pool factory instance.
- */
- protected void initService() throws Exception
- {
- Class cls = Class.forName(poolFactoryClass);
- poolFactory = (ServerSessionPoolFactory)cls.newInstance();
- poolFactory.setName(name);
-
- log.debug("initalized with pool factory: " + poolFactory);
- }
/**
* Start the service.
@@ -147,6 +126,11 @@
*/
protected void startService() throws Exception
{
+ Class cls = Class.forName(poolFactoryClass);
+ poolFactory = (ServerSessionPoolFactory)cls.newInstance();
+ poolFactory.setName(name);
+
+ log.debug("initialized with pool factory: " + poolFactory);
InitialContext ctx = new InitialContext();
String name = poolFactory.getName();
String jndiname = "java:/" + name;
1.2.6.3 +1 -1
jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java
Index: ServerSessionPoolLoaderMBean.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/ServerSessionPoolLoaderMBean.java,v
retrieving revision 1.2.6.2
retrieving revision 1.2.6.3
diff -u -r1.2.6.2 -r1.2.6.3
--- ServerSessionPoolLoaderMBean.java 2001/08/23 03:58:15 1.2.6.2
+++ ServerSessionPoolLoaderMBean.java 2001/12/14 03:22:11 1.2.6.3
@@ -25,7 +25,7 @@
* <p>Created: Wed Nov 29 16:20:17 2000
*
* @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @version $Revision: 1.2.6.2 $
+ * @version $Revision: 1.2.6.3 $
*/
public interface ServerSessionPoolLoaderMBean
extends ServiceMBean
1.4.6.3 +307 -161 jboss/src/main/org/jboss/jms/asf/StdServerSession.java
Index: StdServerSession.java
===================================================================
RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSession.java,v
retrieving revision 1.4.6.2
retrieving revision 1.4.6.3
diff -u -r1.4.6.2 -r1.4.6.3
--- StdServerSession.java 2001/08/23 03:58:15 1.4.6.2
+++ StdServerSession.java 2001/12/14 03:22:11 1.4.6.3
@@ -1,77 +1,94 @@
/*
- * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
import javax.jms.ServerSession;
import javax.jms.Session;
import javax.jms.XASession;
-
import javax.naming.InitialContext;
-
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
-
-import org.apache.log4j.Category;
+import javax.transaction.xa.Xid;
+import org.jboss.logging.Logger;
import org.jboss.tm.TransactionManagerService;
+import org.jboss.tm.XidImpl;
/**
- * An implementation of ServerSession.
+ * An implementation of ServerSession. <p>
*
- * <p>Created: Thu Dec 7 18:25:40 2000
+ * Created: Thu Dec 7 18:25:40 2000
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
- * @version $Revision: 1.4.6.2 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> .
+ * @version $Revision: 1.4.6.3 $
*/
public class StdServerSession
- implements Runnable, ServerSession
+ implements Runnable, ServerSession, MessageListener
{
- /** Instance logger. */
- private final Category log = Category.getInstance(this.getClass());
-
- /** The server session pool which we belong to. */
- private StdServerSessionPool serverSessionPool; // = null;
-
- /** Our session resource. */
- private Session session; // = null;
-
- /** Our XA session resource. */
- private XASession xaSession; // = null;
-
- /** The transaction manager that we will use for transactions. */
+ /**
+ * Instance logger.
+ */
+ static Logger log = Logger.getLogger(StdServerSession.class);
+
+ /**
+ * The server session pool which we belong to.
+ */
+ private StdServerSessionPool serverSessionPool;
+
+ /**
+ * Our session resource.
+ */
+ private Session session;
+
+ /**
+ * Our XA session resource.
+ */
+ private XASession xaSession;
+
+ /**
+ * The transaction manager that we will use for transactions.
+ */
private TransactionManager tm;
-
+
/**
- * Create a <tt>StdServerSession</tt>.
- *
- * @param pool The server session pool which we belong to.
- * @param session Our session resource.
- * @param xaSession Our XA session resource.
+ * Use the session's XAResource directly if we have an JBossMQ XASession.
+ * this allows us to get around the TX timeout problem when you have
+ * extensive message processing.
+ */
+ private boolean useLocalTX;
+
+ /**
+ * The listener to delegate calls, to. In our case the container invoker.
+ */
+ private MessageListener delegateListener;
+
+ /**
+ * Create a <tt>StdServerSession</tt> .
*
- * @throws JMSException Transation manager was not found.
+ * @param pool The server session pool which we belong to.
+ * @param session Our session resource.
+ * @param xaSession Our XA session resource.
+ * @param delegateListener Listener to call when messages arrives.
+ * @param useLocalTX Will this session be used in a global TX (we can
optimize with 1 phase commit)
+ * @throws JMSException Transation manager was not found.
+ * @exception JMSException Description of Exception
*/
StdServerSession(final StdServerSessionPool pool,
- final Session session,
- final XASession xaSession)
+ final Session session,
+ final XASession xaSession,
+ final MessageListener delegateListener,
+ boolean useLocalTX)
throws JMSException
{
// assert pool != null
@@ -80,179 +97,308 @@
this.serverSessionPool = pool;
this.session = session;
this.xaSession = xaSession;
-
- if (log.isDebugEnabled()) {
- log.debug("initializing (pool, session, xaSession): " +
- pool + ", " + session + ", " + xaSession);
- }
+ this.delegateListener = delegateListener;
+ if( xaSession == null )
+ useLocalTX = false;
+ this.useLocalTX = useLocalTX;
+
+ log.debug("initializing (pool, session, xaSession, useLocalTX): " +
+ pool + ", " + session + ", " + xaSession + ", " + useLocalTX);
+
+ // Set out self as message listener
+ if (xaSession != null)
+ xaSession.setMessageListener(this);
+ else
+ session.setMessageListener(this);
InitialContext ctx = null;
- try {
+ try
+ {
ctx = new InitialContext();
- tm = (TransactionManager)
- ctx.lookup(TransactionManagerService.JNDI_NAME);
+ tm = (TransactionManager) ctx.lookup(TransactionManagerService.JNDI_NAME);
}
- catch (Exception e) {
+ catch (Exception e)
+ {
throw new JMSException("Transation manager was not found");
}
- finally {
- if (ctx != null) {
- try {
+ finally
+ {
+ if (ctx != null)
+ {
+ try
+ {
ctx.close();
}
- catch (Exception ignore) {}
+ catch (Exception ignore)
+ {
+ }
}
}
}
-
+
// --- Impl of JMS standard API
-
+
/**
- * Returns the session.
+ * Returns the session. <p>
*
- * <p>This simply returns what it has fetched from the connection. It is
- * up to the jms provider to typecast it and have a private API to stuff
- * messages into it.
+ * This simply returns what it has fetched from the connection. It is up to
+ * the jms provider to typecast it and have a private API to stuff messages
+ * into it.
*
- * @return The session.
+ * @return The session.
+ * @exception JMSException Description of Exception
*/
public Session getSession() throws JMSException
{
- return session;
+ if (xaSession != null)
+ return xaSession;
+ else
+ return session;
}
-
+
+ //--- Protected parts, used by other in the package
+
/**
- * Start the session and begin consuming messages.
- *
- * @throws JMSException No listener has been specified.
+ * Runs in an own thread, basically calls the session.run(), it is up to the
+ * session to have been filled with messages and it will run against the
+ * listener set in StdServerSessionPool. When it has send all its messages it
+ * returns.
*/
- public void start() throws JMSException {
- log.debug("starting invokes on server session");
-
- if (session != null) {
- try {
- serverSessionPool.getExecutor().execute(this);
- }
- catch (InterruptedException ignore) {}
- }
- else {
- throw new JMSException("No listener has been specified");
- }
+ public void run()
+ {
+ if( log.isTraceEnabled() )
+ log.trace("running...");
+ if (xaSession != null)
+ xaSession.run();
+ else
+ session.run();
}
-
- //--- Protected parts, used by other in the package
-
/**
- * Runs in an own thread, basically calls the session.run(), it is up
- * to the session to have been filled with messages and it will run
- * against the listener set in StdServerSessionPool. When it has send
- * all its messages it returns.
+ * Will get called from session for each message stuffed into it.
*
- * HC: run() also starts a transaction with the TransactionManager and
- * enlists the XAResource of the JMS XASession if a XASession was
- * available. A good JMS implementation should provide the XASession
- * for use in the ASF. So we optimize for the case where we have an
- * XASession. So, for the case where we do not have an XASession and
- * the bean is not transacted, we have the unneeded overhead of creating
- * a Transaction. I'm leaving it this way since it keeps the code simpler
- * and that case should not be too common (JBossMQ provides XASessions).
+ * Starts a transaction with the TransactionManager
+ * and enlists the XAResource of the JMS XASession if a XASession was
+ * available. A good JMS implementation should provide the XASession for use
+ * in the ASF. So we optimize for the case where we have an XASession. So,
+ * for the case where we do not have an XASession and the bean is not
+ * transacted, we have the unneeded overhead of creating a Transaction. I'm
+ * leaving it this way since it keeps the code simpler and that case should
+ * not be too common (JBossMQ provides XASessions).
*/
- public void run() {
- log.debug("running...");
-
+ public void onMessage(Message msg)
+ {
+ boolean trace = log.isTraceEnabled();
+ if( trace )
+ log.trace("onMessage running (pool, session, xaSession, useLocalTX): " +
+ ", " + session + ", " + xaSession + ", " + useLocalTX);
+
+ // Used if run with useLocalTX if true
+ Xid localXid = null;
+ boolean localRollbackFlag=false;
+ // Used if run with useLocalTX if false
Transaction trans = null;
- try {
- tm.begin();
- trans = tm.getTransaction();
-
- if (xaSession != null) {
+ try
+ {
+
+ if (useLocalTX)
+ {
+ // Use JBossMQ One Phase Commit to commit the TX
+ localXid = new XidImpl();
XAResource res = xaSession.getXAResource();
- trans.enlistResource(res);
- if (log.isDebugEnabled()) {
- log.debug("XAResource '"+res+"' enlisted.");
+ res.start(localXid, XAResource.TMNOFLAGS);
+
+ if( trace )
+ log.trace("Using optimized 1p commit to control TX.");
+ }
+ else
+ {
+
+ // Use the TM to control the TX
+ tm.begin();
+ trans = tm.getTransaction();
+
+ if (xaSession != null)
+ {
+ XAResource res = xaSession.getXAResource();
+ trans.enlistResource(res);
+ if( trace )
+ log.trace("XAResource '" + res + "' enlisted.");
}
- }
-
+ }
+ //currentTransactionId = connection.spyXAResourceManager.startTx();
+
// run the session
- session.run();
+ //session.run();
+ // Call delegate listener
+ delegateListener.onMessage(msg);
}
- catch (Exception e) {
+ catch (Exception e)
+ {
log.error("session failed to run; setting rollback only", e);
-
- try {
- // The transaction will be rolledback in the finally
- trans.setRollbackOnly();
+
+ if (useLocalTX)
+ {
+ // Use JBossMQ One Phase Commit to commit the TX
+ localRollbackFlag = true;
}
- catch (Exception x) {
- log.error("failed to set rollback only", x);
+ else
+ {
+ // Mark for tollback TX via TM
+ try
+ {
+ // The transaction will be rolledback in the finally
+ if( trace )
+ log.trace("Using TM to mark TX for rollback.");
+ trans.setRollbackOnly();
+ }
+ catch (Exception x)
+ {
+ log.error("failed to set rollback only", x);
+ }
}
+
}
- finally {
- try {
- // Marked rollback
- if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK) {
- log.info("Rolling back JMS transaction");
- // actually roll it back
- trans.rollback();
-
- // NO XASession? then manually rollback.
- // This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && serverSessionPool.isTransacted()) {
- session.rollback();
+ finally
+ {
+ try
+ {
+ if (useLocalTX)
+ {
+ if( localRollbackFlag == true )
+ {
+ if( trace )
+ log.trace("Using optimized 1p commit to rollback TX.");
+
+ XAResource res = xaSession.getXAResource();
+ res.end(localXid, XAResource.TMSUCCESS);
+ res.rollback(localXid);
+
}
- } else if (trans.getStatus() == Status.STATUS_ACTIVE) {
- // Commit tx
- // This will happen if
- // a) everything goes well
- // b) app. exception was thrown
- trans.commit();
-
- // NO XASession? then manually commit. This is not so good but
- // it's the best we can do if we have no XASession.
- if (xaSession == null && serverSessionPool.isTransacted()) {
- session.commit();
+ else
+ {
+ if( trace )
+ log.trace("Using optimized 1p commit to commit TX.");
+
+ XAResource res = xaSession.getXAResource();
+ res.end(localXid, XAResource.TMSUCCESS);
+ res.commit(localXid, true);
}
}
+ else
+ {
+ // Use the TM to commit the Tx
+ // Marked rollback
+ if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
+ {
+ if( trace )
+ log.trace("Rolling back JMS transaction");
+ // actually roll it back
+ trans.rollback();
+
+ // NO XASession? then manually rollback.
+ // This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.rollback();
+ }
+ }
+ else if (trans.getStatus() == Status.STATUS_ACTIVE)
+ {
+ // Commit tx
+ // This will happen if
+ // a) everything goes well
+ // b) app. exception was thrown
+ if( trace )
+ log.trace("Commiting the JMS transaction");
+ trans.commit();
+
+ // NO XASession? then manually commit. This is not so good but
+ // it's the best we can do if we have no XASession.
+ if (xaSession == null && serverSessionPool.isTransacted())
+ {
+ session.commit();
+ }
+ }
+ }
}
- catch (Exception e) {
+ catch (Exception e)
+ {
log.error("failed to commit/rollback", e);
}
-
+
StdServerSession.this.recycle();
}
-
- log.debug("done");
+ if( trace )
+ log.trace("onMessage done");
}
-
+
/**
- * This method is called by the ServerSessionPool when it is ready to
- * be recycled intot the pool
+ * Start the session and begin consuming messages.
+ *
+ * @throws JMSException No listener has been specified.
*/
- void recycle()
+ public void start() throws JMSException
{
- serverSessionPool.recycle(this);
+ if( log.isTraceEnabled() )
+ log.trace("starting invokes on server session");
+
+ if (session != null)
+ {
+ try
+ {
+ serverSessionPool.getExecutor().execute(this);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ else
+ {
+ throw new JMSException("No listener has been specified");
+ }
}
-
+
/**
* Called by the ServerSessionPool when the sessions should be closed.
*/
- void close() {
- if (session != null) {
- try {
+ void close()
+ {
+ if (session != null)
+ {
+ try
+ {
session.close();
- } catch (Exception ignore) {}
+ }
+ catch (Exception ignore)
+ {
+ }
session = null;
}
- if (xaSession != null) {
- try {
+ if (xaSession != null)
+ {
+ try
+ {
xaSession.close();
- } catch (Exception ignore) {}
+ }
+ catch (Exception ignore)
+ {
+ }
xaSession = null;
}
-
+
log.debug("closed");
}
+
+ /**
+ * This method is called by the ServerSessionPool when it is ready to be
+ * recycled intot the pool
+ */
+ void recycle()
+ {
+ serverSessionPool.recycle(this);
+ }
+
}
1.4.6.3 +208 -151 jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java
Index: StdServerSessionPool.java
===================================================================
RCS file: /cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPool.java,v
retrieving revision 1.4.6.2
retrieving revision 1.4.6.3
diff -u -r1.4.6.2 -r1.4.6.3
--- StdServerSessionPool.java 2001/08/23 03:58:15 1.4.6.2
+++ StdServerSessionPool.java 2001/12/14 03:22:11 1.4.6.3
@@ -1,128 +1,132 @@
/*
- * Copyright (c) 2000 Peter Antman Tim <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
-import java.util.List;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.QueueConnection;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
-import javax.jms.MessageListener;
+import javax.jms.Session;
import javax.jms.TopicConnection;
-import javax.jms.XATopicConnection;
-import javax.jms.QueueConnection;
import javax.jms.XAQueueConnection;
-import javax.jms.Session;
-import javax.jms.XASession;
import javax.jms.XAQueueSession;
+import javax.jms.XASession;
+import javax.jms.XATopicConnection;
import javax.jms.XATopicSession;
-
-import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
-import EDU.oswego.cs.dl.util.concurrent.Executor;
-import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
-import org.apache.log4j.Category;
+import org.jboss.logging.Logger;
/**
- * Implementation of ServerSessionPool.
+ * Implementation of ServerSessionPool. <p>
*
- * <p>Created: Thu Dec 7 17:02:03 2000
+ * Created: Thu Dec 7 17:02:03 2000
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @version $Revision: 1.4.6.2 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> .
+ * @version $Revision: 1.4.6.3 $
*/
public class StdServerSessionPool
- implements ServerSessionPool
+ implements ServerSessionPool
{
- /** The default size of the pool. */
- private static final int DEFAULT_POOL_SIZE = 15;
+ /**
+ * The default size of the pool.
+ */
+ private final static int DEFAULT_POOL_SIZE = 15;
- /** The thread group which session workers will run. */
+ /**
+ * The thread group which session workers will run.
+ */
private static ThreadGroup threadGroup =
- new ThreadGroup("ASF Session Pool Threads");
+ new ThreadGroup("ASF Session Pool Threads");
- /** Instance logger. */
- private final Category log = Category.getInstance(this.getClass());
+ /**
+ * Instance logger.
+ */
+ private final Logger log = Logger.getLogger(this.getClass());
- /** The size of the pool. */
+ /**
+ * The size of the pool.
+ */
private int poolSize;
- /** The message acknowledgment mode. */
+ /**
+ * The message acknowledgment mode.
+ */
private int ack;
- /** True if this is a transacted session. */
+ /**
+ * Is the bean container managed?
+ */
+ private boolean useLocalTX;
+
+ /**
+ * True if this is a transacted session.
+ */
private boolean transacted;
- /** The session connection. */
+ /**
+ * The session connection.
+ */
private Connection con;
- /** The message listener for the session. */
+ /**
+ * The message listener for the session.
+ */
private MessageListener listener;
- /** The list of ServerSessions. */
+ /**
+ * The list of ServerSessions.
+ */
private List sessionPool;
- /** The executor for processing messages? */
+ /**
+ * The executor for processing messages?
+ */
private PooledExecutor executor;
- /** Used to signal when the Pool is being closed down */
+ /**
+ * Used to signal when the Pool is being closed down
+ */
private boolean closing = false;
- /** Used during close down to wait for all server sessions to be returned and
closed.*/
- private int numServerSessions = 0;
-
/**
- * Construct a <tt>StdServerSessionPool</tt> using the default
- * pool size.
- *
- * @param con
- * @param transacted
- * @param ack
- * @param listener
+ * Used during close down to wait for all server sessions to be returned and
+ * closed.
*/
- public StdServerSessionPool(final Connection con,
- final boolean transacted,
- final int ack,
- final MessageListener listener)
- throws JMSException
- {
- this(con, transacted, ack, listener, DEFAULT_POOL_SIZE);
- }
+ private int numServerSessions = 0;
/**
- * Construct a <tt>StdServerSessionPool</tt> using the default
- * pool size.
+ * Construct a <tt>StdServerSessionPool</tt> using the default pool size.
*
- * @param con
- * @param transacted
- * @param ack
- * @param listener
- * @param maxSession
+ * @param con connection to get sessions from
+ * @param transacted transaction mode when not XA (
+ * @param ack ackmode when not XA
+ * @param listener the listener the sessions will call
+ * @param maxSession maximum number of sessions in the pool
+ * @param isuseLocalTX Description of Parameter
+ * @exception JMSException Description of Exception
*/
public StdServerSessionPool(final Connection con,
- final boolean transacted,
- final int ack,
- final MessageListener listener,
- final int maxSession)
- throws JMSException
+ final boolean transacted,
+ final int ack,
+ final boolean useLocalTX,
+ final MessageListener listener,
+ final int maxSession)
+ throws JMSException
{
this.con = con;
this.ack = ack;
@@ -130,19 +134,29 @@
this.transacted = transacted;
this.poolSize = maxSession;
this.sessionPool = new ArrayList(maxSession);
+ this.useLocalTX = useLocalTX;
// setup the worker pool
executor = new PooledExecutor(poolSize);
executor.setMinimumPoolSize(0);
- executor.setKeepAliveTime(1000*30);
+ executor.setKeepAliveTime(1000 * 30);
executor.waitWhenBlocked();
- executor.setThreadFactory(new ThreadFactory() {
+ executor.setThreadFactory(
+ new ThreadFactory()
+ {
private volatile int count = 0;
-
- public Thread newThread(final Runnable command) {
+
+ /**
+ * #Description of the Method
+ *
+ * @param command Description of Parameter
+ * @return Description of the Returned Value
+ */
+ public Thread newThread(final Runnable command)
+ {
return new Thread(threadGroup,
- command,
- "Thread Pool Worker-" + count++);
+ command,
+ "Thread Pool Worker-" + count++);
}
});
@@ -155,98 +169,78 @@
/**
* Get a server session.
- *
- * @return A server session.
*
- * @throws JMSException Failed to get a server session.
+ * @return A server session.
+ * @throws JMSException Failed to get a server session.
*/
public ServerSession getServerSession() throws JMSException
{
- log.debug("getting a server session");
+ if( log.isTraceEnabled() )
+ log.trace("getting a server session");
ServerSession session = null;
- try {
- while (true) {
- synchronized (sessionPool) {
- if(closing){
+ try
+ {
+ while (true)
+ {
+ synchronized (sessionPool)
+ {
+ if (closing)
+ {
throw new JMSException("Cannot get session after pool has been
closed down.");
}
- else if (sessionPool.size() > 0) {
+ else if (sessionPool.size() > 0)
+ {
session = (ServerSession)sessionPool.remove(0);
break;
}
- else {
- try {
+ else
+ {
+ try
+ {
sessionPool.wait();
+ }
+ catch (InterruptedException ignore)
+ {
}
- catch (InterruptedException ignore) {}
}
}
}
}
- catch (Exception e) {
+ catch (Exception e)
+ {
throw new JMSException("Failed to get a server session: " + e);
}
// assert session != null
-
- log.debug("using server session: " + session);
+ if( log.isTraceEnabled() )
+ log.trace("using server session: " + session);
return session;
}
- // --- Protected messages for StdServerSession to use
-
- /**
- * Returns true if this server session is transacted.
- */
- boolean isTransacted() {
- return transacted;
- }
-
- /**
- * Recycle a server session.
- */
- void recycle(StdServerSession session) {
- synchronized (sessionPool) {
- if(closing){
- session.close();
- numServerSessions--;
- if(numServerSessions == 0) //notify clear thread.
- sessionPool.notifyAll();
- }else{
- sessionPool.add(session);
- sessionPool.notifyAll();
- log.debug("recycled server session: " + session);
- }
- }
- }
-
- /**
- * Get the executor we are using.
- */
- Executor getExecutor() {
- return executor;
- }
-
/**
* Clear the pool, clear out both threads and ServerSessions,
* connection.stop() should be run before this method.
*/
- public void clear() {
- synchronized (sessionPool) {
+ public void clear()
+ {
+ synchronized (sessionPool)
+ {
// FIXME - is there a runaway condition here. What if a
// ServerSession are taken by a ConnecionConsumer? Should we set
// a flag somehow so that no ServerSessions are recycled and the
// ThreadPool won't leave any more threads out.
closing = true;
- if (log.isDebugEnabled()) {
+ if (log.isDebugEnabled())
+ {
log.debug("Clearing " + sessionPool.size() +
- " from ServerSessionPool");
+ " from ServerSessionPool");
}
Iterator iter = sessionPool.iterator();
- while (iter.hasNext()) {
+ while (iter.hasNext())
+ {
StdServerSession ses = (StdServerSession)iter.next();
// Should we do anything to the server session?
ses.close();
@@ -261,9 +255,69 @@
executor.shutdownAfterProcessingCurrentlyQueuedTasks();
//wait for all server sessions to be returned.
- synchronized(sessionPool){
- while(numServerSessions > 0)
- try{ sessionPool.wait(); }catch(InterruptedException ignore){}
+ synchronized (sessionPool)
+ {
+ while (numServerSessions > 0)
+ {
+ try
+ {
+ sessionPool.wait();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the executor we are using.
+ *
+ * @return The Executor value
+ */
+ Executor getExecutor()
+ {
+ return executor;
+ }
+
+ // --- Protected messages for StdServerSession to use
+
+ /**
+ * Returns true if this server session is transacted.
+ *
+ * @return The Transacted value
+ */
+ boolean isTransacted()
+ {
+ return transacted;
+ }
+
+ /**
+ * Recycle a server session.
+ *
+ * @param session Description of Parameter
+ */
+ void recycle(StdServerSession session)
+ {
+ synchronized (sessionPool)
+ {
+ if (closing)
+ {
+ session.close();
+ numServerSessions--;
+ if (numServerSessions == 0)
+ {
+ //notify clear thread.
+ sessionPool.notifyAll();
+ }
+ }
+ else
+ {
+ sessionPool.add(session);
+ sessionPool.notifyAll();
+ if( log.isTraceEnabled() )
+ log.trace("recycled server session: " + session);
+ }
}
}
@@ -271,44 +325,47 @@
private void init() throws JMSException
{
- for (int index = 0; index < poolSize; index++) {
+ for (int index = 0; index < poolSize; index++)
+ {
// Here is the meat, that MUST follow the spec
Session ses = null;
XASession xaSes = null;
log.debug("initializing with connection: " + con);
- if (con instanceof XATopicConnection) {
+ if (con instanceof XATopicConnection)
+ {
xaSes = ((XATopicConnection)con).createXATopicSession();
ses = ((XATopicSession)xaSes).getTopicSession();
}
- else if (con instanceof XAQueueConnection) {
+ else if (con instanceof XAQueueConnection)
+ {
xaSes = ((XAQueueConnection)con).createXAQueueSession();
ses = ((XAQueueSession)xaSes).getQueueSession();
}
- else if (con instanceof TopicConnection) {
+ else if (con instanceof TopicConnection)
+ {
ses = ((TopicConnection)con).createTopicSession(transacted, ack);
log.warn("Using a non-XA TopicConnection. " +
- "It will not be able to participate in a Global UOW");
+ "It will not be able to participate in a Global UOW");
}
- else if (con instanceof QueueConnection) {
+ else if (con instanceof QueueConnection)
+ {
ses = ((QueueConnection)con).createQueueSession(transacted, ack);
log.warn("Using a non-XA QueueConnection. " +
- "It will not be able to participate in a Global UOW");
+ "It will not be able to participate in a Global UOW");
}
- else {
+ else
+ {
// should never happen really
log.error("Connection was not reconizable: " + con);
throw new JMSException("Connection was not reconizable: " + con);
}
- // This might not be totaly spec compliant since it says that app
- // server should create as many message listeners its needs.
- log.debug("setting session listener: " + listener);
- ses.setMessageListener(listener);
+ // create the server session and add it to the pool - it is up to the
+ // server session to set the listener
+ StdServerSession serverSession = new StdServerSession(this, ses, xaSes,
listener, useLocalTX);
- // create the server session and add it to the pool
- ServerSession serverSession = new StdServerSession(this, ses, xaSes);
sessionPool.add(serverSession);
numServerSessions++;
log.debug("added server session to the pool: " + serverSession);
1.3.6.3 +32 -45
jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java
Index: StdServerSessionPoolFactory.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/jms/asf/StdServerSessionPoolFactory.java,v
retrieving revision 1.3.6.2
retrieving revision 1.3.6.3
diff -u -r1.3.6.2 -r1.3.6.3
--- StdServerSessionPoolFactory.java 2001/08/23 03:58:15 1.3.6.2
+++ StdServerSessionPoolFactory.java 2001/12/14 03:22:11 1.3.6.3
@@ -1,93 +1,80 @@
/*
- * Copyright (c) 2000 Peter Antman DN <[EMAIL PROTECTED]>
+ * JBoss, the OpenSource J2EE webOS
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2 of the License, or (at your option) any later version
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
*/
package org.jboss.jms.asf;
import java.io.Serializable;
-
-import javax.jms.ServerSessionPool;
-import javax.jms.MessageListener;
import javax.jms.Connection;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.ServerSessionPool;
+
/**
- * An implementation of ServerSessionPoolFactory.
+ * An implementation of ServerSessionPoolFactory. <p>
*
- * <p>Created: Fri Dec 22 09:47:41 2000
+ * Created: Fri Dec 22 09:47:41 2000
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @version $Revision: 1.3.6.2 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @author <a href="mailto:[EMAIL PROTECTED]">Hiram Chirino</a> .
+ * @version $Revision: 1.3.6.3 $
*/
public class StdServerSessionPoolFactory
- implements ServerSessionPoolFactory, Serializable
+ implements ServerSessionPoolFactory, Serializable
{
- /** The name of this factory. */
+ /**
+ * The name of this factory.
+ */
private String name;
/**
- * Construct a <tt>StdServerSessionPoolFactory</tt>.
+ * Construct a <tt>StdServerSessionPoolFactory</tt> .
*/
- public StdServerSessionPoolFactory() {
+ public StdServerSessionPoolFactory()
+ {
super();
}
/**
* Set the name of the factory.
*
- * @param name The name of the factory.
+ * @param name The name of the factory.
*/
- public void setName(final String name) {
+ public void setName(final String name)
+ {
this.name = name;
}
-
+
/**
* Get the name of the factory.
*
- * @return The name of the factory.
+ * @return The name of the factory.
*/
- public String getName() {
+ public String getName()
+ {
return name;
}
/**
- * Create a new <tt>ServerSessionPool</tt>.
+ * Create a new <tt>ServerSessionPool</tt> .
*
* @param con
* @param maxSession
* @param isTransacted
* @param ack
* @param listener
- * @return A new pool.
- *
+ * @param isContainerManaged Description of Parameter
+ * @return A new pool.
* @throws JMSException
+ * @exception javax.jms.JMSException Description of Exception
*/
- public ServerSessionPool getServerSessionPool(final Connection con,
- final int maxSession,
- final boolean isTransacted,
- final int ack,
- final MessageListener listener)
- throws JMSException
+ public ServerSessionPool getServerSessionPool(Connection con, int maxSession,
+ boolean isTransacted, int ack, boolean useLocalTX, MessageListener listener)
throws JMSException
{
- ServerSessionPool pool = (ServerSessionPool)
- new StdServerSessionPool(con,
- isTransacted,
- ack,
- listener,
- maxSession);
+ ServerSessionPool pool = new StdServerSessionPool(con, isTransacted, ack,
useLocalTX, listener, maxSession);
return pool;
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development