User: starksm
Date: 01/12/13 19:22:11
Modified: src/main/org/jboss/ejb/plugins/jms Tag: Branch_2_4
JMSContainerInvoker.java
Log:
Integrate changes from 3.0 to improve the MDB/ASF layer. This includes
support for the dead message queue for repeated MDB.onMessage failures.
Revision Changes Path
No revision
No revision
1.12.4.6 +864 -518
jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java
Index: JMSContainerInvoker.java
===================================================================
RCS file:
/cvsroot/jboss/jboss/src/main/org/jboss/ejb/plugins/jms/JMSContainerInvoker.java,v
retrieving revision 1.12.4.5
retrieving revision 1.12.4.6
diff -u -r1.12.4.5 -r1.12.4.6
--- JMSContainerInvoker.java 2001/11/10 04:48:13 1.12.4.5
+++ JMSContainerInvoker.java 2001/12/14 03:22:11 1.12.4.6
@@ -5,481 +5,594 @@
* See terms of license at gnu.org.
*/
package org.jboss.ejb.plugins.jms;
-
-import java.util.Collection;
-import java.util.Hashtable;
import java.lang.reflect.Method;
import java.security.Principal;
-import javax.jms.Connection;
-import javax.jms.ConnectionConsumer;
-import javax.jms.Destination;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.ServerSessionPool;
-import javax.jms.Topic;
-import javax.jms.TopicConnection;
+import java.util.Collection;
+import java.util.Hashtable;
+import javax.ejb.EJBHome;
import javax.ejb.EJBMetaData;
-import javax.ejb.EJBHome;
import javax.ejb.EJBObject;
-import javax.naming.Name;
-import javax.naming.InitialContext;
-import javax.naming.Context;
-import javax.naming.NamingException;
-import javax.naming.NameNotFoundException;
-
-import javax.transaction.Status;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
+import javax.jms.*;
+import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
-import javax.management.MBeanServer;
import javax.management.ObjectName;
+import javax.naming.Context;
+import javax.naming.InitialContext;
-import org.w3c.dom.Element;
+import javax.naming.Name;
+import javax.naming.NamingException;
-import org.apache.log4j.Category;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
-import org.jboss.jms.ConnectionFactoryHelper;
-import org.jboss.ejb.MethodInvocation;
+import org.jboss.deployment.DeploymentException;
import org.jboss.ejb.Container;
-import org.jboss.ejb.ContainerInvokerContainer;
-import org.jboss.ejb.Interceptor;
import org.jboss.ejb.ContainerInvoker;
-import org.jboss.deployment.DeploymentException;
-import org.jboss.metadata.XmlLoadable;
-import org.jboss.metadata.MetaData;
-import org.jboss.metadata.MessageDrivenMetaData;
-import org.jboss.jms.jndi.JMSProviderAdapter;
+import org.jboss.ejb.MethodInvocation;
+import org.jboss.jms.ConnectionFactoryHelper;
import org.jboss.jms.asf.ServerSessionPoolFactory;
import org.jboss.jms.asf.StdServerSessionPool;
+import org.jboss.jms.jndi.JMSProviderAdapter;
+import org.jboss.logging.Logger;
+import org.jboss.metadata.MessageDrivenMetaData;
+import org.jboss.metadata.MetaData;
+import org.jboss.metadata.XmlLoadable;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
/**
* ContainerInvoker for JMS MessageDrivenBeans, based on JRMPContainerInvoker.
*
- * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a>.
- * @author <a href="mailto:[EMAIL PROTECTED]">Rickard �berg</a>
- * @author <a href="mailto:[EMAIL PROTECTED]">Sebastien Alborini</a>
- * @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
- * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
- * @version $Revision: 1.12.4.5 $
+ * @author <a href="mailto:[EMAIL PROTECTED]">Peter Antman</a> .
+ * @author <a href="mailto:[EMAIL PROTECTED]">Rickard �berg</a>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Sebastien Alborini
+ * </a>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Marc Fleury</a>
+ * @author <a href="mailto:[EMAIL PROTECTED]">Jason Dillon</a>
+ * @version $Revision: 1.12.4.6 $
*/
public class JMSContainerInvoker
- implements ContainerInvoker, XmlLoadable
+ implements ContainerInvoker, XmlLoadable
{
// Constants -----------------------------------------------------
-
- /** {@link MessageListener#onMessage} reference. */
- protected static /* final */ Method ON_MESSAGE;
-
+
/**
- * Initialize the ON_MESSAGE reference.
+ * {@link MessageListener#onMessage} reference.
*/
- static {
- try {
- final Class type = MessageListener.class;
- final Class arg = Message.class;
- ON_MESSAGE = type.getMethod("onMessage", new Class[] { arg });
- }
- catch (Exception e) {
- e.printStackTrace();
- throw new ExceptionInInitializerError(e);
- }
- }
-
- /** Instance logger. */
- private final Category log = Category.getInstance(this.getClass());
-
+ protected static Method ON_MESSAGE;
+
+ /**
+ * Default destination type. Used when no message-driven-destination is given
+ * in ejb-jar, and a lookup of destinationJNDI from jboss.xml is not
+ * successfull. Default value: javax.jms.Topic.
+ */
+ protected final static String DEFAULT_DESTINATION_TYPE = "javax.jms.Topic";
+
// Attributes ----------------------------------------------------
-
- protected boolean optimize; // = false;
+
+ /**
+ * Description of the Field
+ */
+ protected boolean optimize;
+ // = false;
+ /**
+ * Maximu number provider is allowed to stuff into a session.
+ */
protected int maxMessagesNr = 1;
+ /**
+ * Maximun pool size of server sessions.
+ */
protected int maxPoolSize = 15;
+ /**
+ * Time to wait before retrying to reconnect a lost connection.
+ */
+ protected long reconnectInterval = 10000;
+ /**
+ * If Dead letter queue should be used or not.
+ */
+ protected boolean useDLQ = false;
+ /**
+ * JNDI name of the provider adapter.
+ * @see org.jboss.jms.jndi.JMSProviderAdapter
+ */
protected String providerAdapterJNDI;
+ /**
+ * JNDI name of the server session factory.
+ * @see org.jboss.jms.asf.ServerSessionPoolFactory
+ */
protected String serverSessionPoolFactoryJNDI;
+ /**
+ * JMS acknowledge mode, used when session is not XA.
+ */
protected int acknowledgeMode;
+ /**
+ * escription of the Field
+ */
+ protected boolean isContainerManagedTx;
+ /**
+ * Description of the Field
+ */
+ protected boolean isNotSupportedTx;
+ /**
+ * The container.
+ */
protected Container container;
+ /**
+ * The JMS connection.
+ */
protected Connection connection;
+ /**
+ * TH JMS connection consumer.
+ */
protected ConnectionConsumer connectionConsumer;
+ /**
+ * Description of the Field
+ */
protected TransactionManager tm;
+ /**
+ * Description of the Field
+ */
protected ServerSessionPool pool;
+ /**
+ * Description of the Field
+ */
protected ExceptionListenerImpl exListener;
+ /**
+ * Description of the Field
+ */
protected String beanName;
+ /**
+ * Dead letter queue handler.
+ */
+ protected DLQHandler dlqHandler;
+ /**
+ * DLQConfig element from MDBConfig element from jboss.xml.
+ */
+ protected Element dlqConfig;
+
+ /**
+ * Instance logger.
+ */
+ private final Logger log = Logger.getLogger(this.getClass());
+ // ContainerService implementation -------------------------------
+
+ /**
+ * Set the container for which this is an invoker to.
+ *
+ * @param container The container for which this is an invoker to.
+ */
+ public void setContainer(final Container container)
+ {
+ this.container = container;
+ //jndiName = container.getBeanMetaData().getJndiName();
+ }
+
// Static --------------------------------------------------------
-
+
// Constructors --------------------------------------------------
-
+
// Public --------------------------------------------------------
-
- public void setOptimized(final boolean optimize) {
+
+ /**
+ * Sets the Optimized attribute of the JMSContainerInvoker object
+ *
+ * @param optimize The new Optimized value
+ */
+ public void setOptimized(final boolean optimize)
+ {
log.debug("Container Invoker optimize set to " + optimize);
this.optimize = optimize;
- }
-
- public boolean isOptimized() {
- log.debug("Optimize in action: " + optimize);
- return optimize;
}
-
- public EJBMetaData getEJBMetaData() {
- throw new Error("Not valid for MessageDriven beans");
- }
-
+
// ContainerInvoker implementation
-
- public EJBHome getEJBHome() {
- throw new Error("Not valid for MessageDriven beans");
- }
-
- public EJBObject getStatelessSessionEJBObject() {
- throw new Error("Not valid for MessageDriven beans");
- }
-
- public EJBObject getStatefulSessionEJBObject(Object id) {
+
+ /**
+ * Gets the EJBHome attribute of the JMSContainerInvoker object
+ *
+ * @return The EJBHome value
+ */
+ public EJBHome getEJBHome()
+ {
throw new Error("Not valid for MessageDriven beans");
}
-
- public EJBObject getEntityEJBObject(Object id) {
+
+ /**
+ * Gets the EJBMetaData attribute of the JMSContainerInvoker object
+ *
+ * @return The EJBMetaData value
+ */
+ public EJBMetaData getEJBMetaData()
+ {
throw new Error("Not valid for MessageDriven beans");
}
-
- public Collection getEntityCollection(Collection ids) {
+
+ /**
+ * Gets the EntityCollection attribute of the JMSContainerInvoker object
+ *
+ * @param ids Description of Parameter
+ * @return The EntityCollection value
+ */
+ public Collection getEntityCollection(Collection ids)
+ {
throw new Error("Not valid for MessageDriven beans");
}
-
- public Object invoke(Object id,
- Method m,
- Object[] args,
- Transaction tx,
- Principal identity,
- Object credential)
- throws Exception
+
+ /**
+ * Gets the EntityEJBObject attribute of the JMSContainerInvoker object
+ *
+ * @param id Description of Parameter
+ * @return The EntityEJBObject value
+ */
+ public EJBObject getEntityEJBObject(Object id)
{
- MethodInvocation mi =
- new MethodInvocation(id, m, args, tx, identity, credential);
-
- // Set the right context classloader
- ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(container.getClassLoader());
-
- try {
- return container.invoke(mi);
- }
- finally {
- Thread.currentThread().setContextClassLoader(oldCl);
- }
+ throw new Error("Not valid for MessageDriven beans");
}
-
- // ContainerService implementation -------------------------------
-
+
/**
- * Set the container for which this is an invoker to.
+ * Gets the StatefulSessionEJBObject attribute of the JMSContainerInvoker
+ * object
*
- * @param container The container for which this is an invoker to.
+ * @param id Description of Parameter
+ * @return The StatefulSessionEJBObject value
*/
- public void setContainer(final Container container)
+ public EJBObject getStatefulSessionEJBObject(Object id)
{
- this.container = container;
- //jndiName = container.getBeanMetaData().getJndiName();
+ throw new Error("Not valid for MessageDriven beans");
}
-
+
/**
- * Return the JMSProviderAdapter that should be used.
+ * Gets the StatelessSessionEJBObject attribute of the JMSContainerInvoker
+ * object
*
- * @return The JMSProviderAdapter to use.
+ * @return The StatelessSessionEJBObject value
*/
- protected JMSProviderAdapter getJMSProviderAdapter()
- throws NamingException
+ public EJBObject getStatelessSessionEJBObject()
{
- Context context = new InitialContext();
- try {
- log.debug("looking up provider adapter: " + providerAdapterJNDI);
- return (JMSProviderAdapter)context.lookup(providerAdapterJNDI);
- }
- finally {
- context.close();
- }
+ throw new Error("Not valid for MessageDriven beans");
}
-
+
/**
- * Parse the JNDI suffix from the given JNDI name.
+ * Gets the Optimized attribute of the JMSContainerInvoker object
*
- * @param jndiname The JNDI name used to lookup the destination.
- * @param defaultSuffix The default suffix to use if parsing fails.
- * @return The parsed suffix or the defaultSuffix
+ * @return The Optimized value
*/
- protected String parseJndiSuffix(final String jndiname,
- final String defautSuffix)
+ public boolean isOptimized()
{
- // jndiSuffix is merely the name that the user has given the MDB.
- // since the jndi name contains the message type I have to split
- // at the "/" if there is no slash then I use the entire jndi name...
- String jndiSuffix = "";
- if (jndiname != null) {
- int indexOfSlash = jndiname.indexOf("/");
- if (indexOfSlash != -1) {
- jndiSuffix = jndiname.substring(indexOfSlash+1);
- } else {
- jndiSuffix = jndiname;
- }
- }
- else {
- // if the jndi name from jboss.xml is null then lets use the ejbName
- jndiSuffix = defautSuffix;
- }
-
- return jndiSuffix;
+ log.debug("Optimize in action: " + optimize);
+ return optimize;
}
-
+
/**
- * Create and or lookup a JMS destination.
- *
- * @param type Either javax.jms.Queue or javax.jms.Topic.
- * @param ctx The naming context to lookup destinations from.
- * @param jndiName The name to use when looking up destinations.
- * @param jndiSuffix The name to use when creating destinations.
- * @return The destination.
- *
- * @throws IllegalArgumentException Type is not Queue or Topic.
+ * Take down all fixtures.
*/
- protected Destination createDestination(final Class type,
- final Context ctx,
- final String jndiName,
- final String jndiSuffix)
- throws Exception
+ public void destroy()
{
- try {
- // first try to look it up
- return (Destination)ctx.lookup(jndiName);
- }
- catch (NamingException e) {
- // if the lookup failes, the try to create it
- log.warn("destination not found: " + jndiName + " reason: " + e);
- log.warn("creating a new temporary destination: " + jndiName);
- //
- // attempt to create the destination (note, this is very
- // very, very unportable).
- //
- MBeanServer server = (MBeanServer)
- MBeanServerFactory.findMBeanServer(null).iterator().next();
-
- String methodName;
- if (type == Topic.class) {
- methodName = "createTopic";
- }
- else if (type == Queue.class) {
- methodName = "createQueue";
- }
- else {
- // type was not a Topic or Queue, bad user
- throw new IllegalArgumentException
- ("expected javax.jms.Queue or javax.jms.Topic: " + type);
- }
-
- // invoke the server to create the destination
- server.invoke(new ObjectName("JBossMQ", "service", "Server"),
- methodName,
- new Object[] { jndiSuffix },
- new String[] { "java.lang.String" });
-
- // try to look it up again
- return (Destination)ctx.lookup(jndiName);
+ log.debug("Destroying JMSContainerInvoker for bean " + beanName);
+
+ // Take down DLQ
+ if ( dlqHandler != null)
+ {
+ dlqHandler.destroy();
+ }
+
+ // close the connection consumer
+ try
+ {
+ if (connectionConsumer != null)
+ {
+ connectionConsumer.close();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Could not close consumer", e);
}
+
+ // clear the server session pool (if it is clearable)
+ try
+ {
+ if (pool instanceof StdServerSessionPool)
+ {
+ StdServerSessionPool p = (StdServerSessionPool)pool;
+ p.clear();
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Could not clear ServerSessionPool", e);
+ }
+
+ // close the connection
+ if (connection != null)
+ {
+ try
+ {
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ log.error("Could not close connection", e);
+ }
+ }
}
-
+
/**
- * Create a server session pool for the given connection.
+ * XmlLoadable implementation.
*
- * @param connection The connection to use.
- * @param maxSession The maximum number of sessions.
- * @param isTransacted True if the sessions are transacted.
- * @param ack The session acknowledgement mode.
- * @param listener The message listener.
- * @return A server session pool.
+ * FIXME - we ought to move all config into MDBConfig, but I do not
+ * do that now due to backward compatibility.
*
- * @throws JMSException
+ * @param element Description of Parameter
+ * @exception DeploymentException Description of Exception
*/
- protected ServerSessionPool
- createSessionPool(final Connection connection,
- final int maxSession,
- final boolean isTransacted,
- final int ack,
- final MessageListener listener)
- throws NamingException, JMSException
+ public void importXml(Element element) throws DeploymentException
{
- ServerSessionPool pool;
- Context context = new InitialContext();
-
- try {
- // first lookup the factory
- log.debug("looking up session pool factory: " +
- serverSessionPoolFactoryJNDI);
- ServerSessionPoolFactory factory = (ServerSessionPoolFactory)
- context.lookup(serverSessionPoolFactoryJNDI);
-
- // the create the pool
- pool = factory.getServerSessionPool
- (connection, maxSession, isTransacted, ack, listener);
+ try
+ {
+ String maxMessages = MetaData.getElementContent
+ (MetaData.getUniqueChild(element, "MaxMessages"));
+ maxMessagesNr = Integer.parseInt(maxMessages);
+
+ String maxSize = MetaData.getElementContent
+ (MetaData.getUniqueChild(element, "MaximumSize"));
+ maxPoolSize = Integer.parseInt(maxSize);
+
+ Element mdbConfig = MetaData.getUniqueChild(element, "MDBConfig");
+
+ String reconnect = MetaData.getElementContent
+ (MetaData.getUniqueChild(mdbConfig, "ReconnectIntervalSec"));
+ reconnectInterval = Long.parseLong(reconnect)*1000;
+
+ // Get Dead letter queue config - and save it for later use
+ Element dlqEl = MetaData.getOptionalChild(mdbConfig, "DLQConfig");
+ if (dlqEl != null)
+ {
+ dlqConfig = (Element)((Node)dlqEl).cloneNode(true);
+ useDLQ = true;
+ }
+ else
+ {
+ useDLQ = false;
+ }
+
}
- finally {
- context.close();
+ catch (NumberFormatException e)
+ {
+ //Noop will take default value
}
-
- return pool;
+ catch (DeploymentException e)
+ {
+ //Noop will take default value
+ }
+
+ // If these are not found we will get a DeploymentException, I hope
+ providerAdapterJNDI = MetaData.getElementContent
+ (MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
+
+ serverSessionPoolFactoryJNDI = MetaData.getElementContent
+ (MetaData.getUniqueChild(element, "ServerSessionPoolFactoryJNDI"));
+
+ // Check java:/ prefix
+ if (!providerAdapterJNDI.startsWith("java:/"))
+ {
+ providerAdapterJNDI = "java:/" + providerAdapterJNDI;
+ }
+
+ if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
+ {
+ serverSessionPoolFactoryJNDI = "java:/" + serverSessionPoolFactoryJNDI;
+ }
}
/**
- * Initialize the container invoker. Sets up a connection, a server
- * session pool and a connection consumer for the configured destination.
+ * Initialize the container invoker. Sets up a connection, a server session
+ * pool and a connection consumer for the configured destination.
*
- * @throws Exception Failed to initalize.
+ * @throws Exception Failed to initalize.
*/
public void init() throws Exception
{
log.debug("initializing");
-
+
+ // Set up Dead Letter Queue handler
+ if (useDLQ)
+ {
+ dlqHandler = new DLQHandler();
+ dlqHandler.importXml(dlqConfig);
+ dlqHandler.init();
+ }
+
// Store TM reference locally - should we test for CMT Required
tm = container.getTransactionManager();
-
+
// Get configuration information - from EJB-xml
MessageDrivenMetaData config =
- ((MessageDrivenMetaData)container.getBeanMetaData());
-
+ ((MessageDrivenMetaData)container.getBeanMetaData());
+
// Selector
String messageSelector = config.getMessageSelector();
-
- // Queue or Topic
+
+ // Queue or Topic - optional unfortunately
String destinationType = config.getDestinationType();
-
+
// Bean Name
beanName = config.getEjbName();
-
- // Is containermanages TX (not used?)
- boolean isContainerManagedTx = config.isContainerManagedTx();
+
+ // Is container managed?
+ isContainerManagedTx = config.isContainerManagedTx();
acknowledgeMode = config.getAcknowledgeMode();
-
+ isNotSupportedTx = config.getMethodTransactionType("onMessage",
+ new Class[] {Message.class}, false) == MetaData.TX_NOT_SUPPORTED;
+
// Get configuration data from jboss.xml
String destinationJNDI = config.getDestinationJndiName();
String user = config.getUser();
String password = config.getPasswd();
-
+
// Get the JMS provider
JMSProviderAdapter adapter = getJMSProviderAdapter();
log.debug("provider adapter: " + adapter);
-
+
// Connect to the JNDI server and get a reference to root context
Context context = adapter.getInitialContext();
log.debug("context: " + context);
-
+
// if we can't get the root context then exit with an exception
- if (context == null) {
- throw new RuntimeException("Failed to get the root context");
+ if (context == null)
+ {
+ throw new RuntimeException("Failed to get the root context");
}
-
+
// Get the JNDI suffix of the destination
String jndiSuffix = parseJndiSuffix(destinationJNDI,
- config.getEjbName());
+ config.getEjbName());
log.debug("jndiSuffix: " + jndiSuffix);
-
+
+ // Unfortunately the destination is optional, so if we do not have one
+ // here we have to look it up if we have a destinationJNDI, else give it
+ // a default.
+ if (destinationType == null)
+ {
+ log.info("No message-driven-destination given, guessing type");
+ destinationType = getDestinationType(context, destinationJNDI);
+ }
+
if (destinationType.equals("javax.jms.Topic"))
{
- log.debug("Got destination type Topic for " + config.getEjbName());
-
- // create a topic connection
- Object factory = context.lookup(adapter.getTopicFactoryRef());
- TopicConnection tConnection =
- (TopicConnection)ConnectionFactoryHelper.createTopicConnection
- (factory, user, password);
- connection = tConnection;
-
- // lookup or create the destination topic
- Topic topic =
- (Topic)createDestination(Topic.class,
- context,
- "topic/" + jndiSuffix,
- jndiSuffix);
-
- // set up the server session pool
- pool = createSessionPool(tConnection,
- maxPoolSize,
- true, // tx
- acknowledgeMode,
- new MessageListenerImpl(this));
-
- // To be no-durable or durable
- if (config.getSubscriptionDurability() !=
- MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
- {
- // Create non durable
- connectionConsumer = tConnection.
- createConnectionConsumer(topic,
- messageSelector,
- pool,
- maxMessagesNr);
- }
- else {
- //Durable subscription
- String clientId = config.getClientId();
- String durableName =
- clientId != null ? clientId: config.getEjbName();
-
- connectionConsumer = tConnection.
- createDurableConnectionConsumer(topic,
- durableName,
- messageSelector,
- pool,
- maxMessagesNr);
- }
-
- log.debug("Topic connectionConsumer set up");
+ log.debug("Got destination type Topic for " + config.getEjbName());
+
+ // create a topic connection
+ Object factory = context.lookup(adapter.getTopicFactoryRef());
+ TopicConnection tConnection =
+ (TopicConnection)ConnectionFactoryHelper.createTopicConnection
+ (factory, user, password);
+ connection = tConnection;
+
+ // lookup or create the destination topic
+ Topic topic = (Topic)createDestination(Topic.class,
+ context,
+ "topic/" + jndiSuffix,
+ jndiSuffix);
+
+ // set up the server session pool
+ pool = createSessionPool(tConnection,
+ maxPoolSize,
+ true, // tx
+ acknowledgeMode ,
+ new MessageListenerImpl(this));
+
+ // To be no-durable or durable
+ if (config.getSubscriptionDurability() !=
+ MessageDrivenMetaData.DURABLE_SUBSCRIPTION)
+ {
+ // Create non durable
+ connectionConsumer =
+ tConnection.createConnectionConsumer(topic,
+ messageSelector,
+ pool,
+ maxMessagesNr);
+ }
+ else
+ {
+ //Durable subscription
+ String clientId = config.getClientId();
+ String durableName =
+ clientId != null ? clientId : config.getEjbName();
+
+ connectionConsumer =
+ tConnection.createDurableConnectionConsumer(topic,
+ durableName,
+ messageSelector,
+ pool,
+ maxMessagesNr);
+ }
+
+ log.debug("Topic connectionConsumer set up");
}
else if (destinationType.equals("javax.jms.Queue"))
{
- log.debug("Got destination type Queue for " + config.getEjbName());
-
- // create a queue connection
- Object qFactory = context.lookup(adapter.getQueueFactoryRef());
- QueueConnection qConnection =
- (QueueConnection)ConnectionFactoryHelper.createQueueConnection
- (qFactory, user, password);
- connection = qConnection;
-
- // lookup or create the destination queue
- Queue queue =
- (Queue)createDestination(Queue.class,
- context,
- "queue/" + jndiSuffix,
- jndiSuffix);
-
- // set up the server session pool
- pool = createSessionPool(qConnection,
- maxPoolSize,
- true, // tx
- acknowledgeMode,
- new MessageListenerImpl(this));
- log.debug("server session pool: " + pool);
-
- // create the connection consumer
- connectionConsumer = qConnection.
- createConnectionConsumer(queue,
- messageSelector,
- pool,
- maxMessagesNr);
- log.debug("connection consumer: " + connectionConsumer);
+ log.debug("Got destination type Queue for " + config.getEjbName());
+
+ // create a queue connection
+ Object qFactory = context.lookup(adapter.getQueueFactoryRef());
+ QueueConnection qConnection =
+ (QueueConnection)ConnectionFactoryHelper.createQueueConnection
+ (qFactory, user, password);
+ connection = qConnection;
+
+ // lookup or create the destination queue
+ Queue queue = (Queue)createDestination(Queue.class,
+ context,
+ "queue/" + jndiSuffix,
+ jndiSuffix);
+
+ // set up the server session pool
+ pool = createSessionPool(qConnection,
+ maxPoolSize,
+ true,
+ // tx
+ acknowledgeMode,
+ new MessageListenerImpl(this));
+ log.debug("server session pool: " + pool);
+
+ // create the connection consumer
+ connectionConsumer =
+ qConnection.createConnectionConsumer(queue,
+ messageSelector,
+ pool,
+ maxMessagesNr);
+ log.debug("connection consumer: " + connectionConsumer);
}
-
- log.debug("initialized");
+
+ log.debug("initialized with config " + toString());
}
-
+
/**
+ * #Description of the Method
+ *
+ * @param id Description of Parameter
+ * @param m Description of Parameter
+ * @param args Description of Parameter
+ * @param tx Description of Parameter
+ * @param identity Description of Parameter
+ * @param credential Description of Parameter
+ * @return Description of the Returned Value
+ * @exception Exception Description of Exception
+ */
+ public Object invoke(Object id,
+ Method m,
+ Object[] args,
+ Transaction tx,
+ Principal identity,
+ Object credential)
+ throws Exception
+ {
+ MethodInvocation mi =
+ new MethodInvocation(id, m, args, tx, identity, credential);
+
+ // Set the right context classloader
+ ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(container.getClassLoader());
+
+ try
+ {
+ return container.invoke(mi);
+ }
+ finally
+ {
+ Thread.currentThread().setContextClassLoader(oldCl);
+ }
+ }
+
+ /**
* Start the connection.
+ *
+ * @exception Exception Description of Exception
*/
public void start() throws Exception
{
@@ -488,7 +601,7 @@
connection.setExceptionListener(exListener);
connection.start();
}
-
+
/**
* Stop the connection.
*/
@@ -496,223 +609,456 @@
{
log.debug("Stopping JMSContainerInvoker for bean " + beanName);
// Silence the exception listener
- if (exListener != null) {
- exListener.stop();
+ if (exListener != null)
+ {
+ exListener.stop();
}
innerStop();
}
-
+
/**
- * Stop done from inside, we should not stop the
- * exceptionListener in inner stop.
+ * Try to get a destination type by looking up the destination JNDI, or
+ * provide a default if there is not destinationJNDI or if it is not possible
+ * to lookup.
+ *
+ * @param ctx The naming context to lookup destinations from.
+ * @param destinationJNDI The name to use when looking up destinations.
+ * @return The destination type, either derived from
+ * destinationJDNI or DEFAULT_DESTINATION_TYPE
*/
- protected void innerStop() {
- try {
- if (connection != null) {
- connection.setExceptionListener(null);
- log.debug("unset exception listener");
- }
- } catch (Exception e) {
- log.error("Could not set ExceptionListener to null", e);
+ protected String getDestinationType(Context ctx, String destinationJNDI)
+ {
+ String destType = null;
+
+ if (destinationJNDI != null)
+ {
+ try
+ {
+ Destination dest = (Destination)ctx.lookup(destinationJNDI);
+ if (dest instanceof javax.jms.Topic)
+ {
+ destType = "javax.jms.Topic";
+ }
+ else if (dest instanceof javax.jms.Queue)
+ {
+ destType = "javax.jms.Queue";
+ }
+ }
+ catch (NamingException ex)
+ {
+ log.debug("Could not do heristic lookup of destination ", ex);
+ }
+
}
-
- // Stop the connection
- try {
- if (connection != null) {
- connection.stop();
- log.debug("connection stopped");
- }
- } catch (Exception e) {
- log.error("Could not stop JMS connection", e);
+ if (destType == null)
+ {
+ log.info("WARNING Could not determine destination type, defaults to: " +
DEFAULT_DESTINATION_TYPE);
+ destType = DEFAULT_DESTINATION_TYPE;
}
+ return destType;
}
-
+
/**
- * Take down all fixtures.
+ * Return the JMSProviderAdapter that should be used.
+ *
+ * @return The JMSProviderAdapter to use.
+ * @exception NamingException Description of Exception
*/
- public void destroy()
+ protected JMSProviderAdapter getJMSProviderAdapter()
+ throws NamingException
{
- log.debug("Destroying JMSContainerInvoker for bean " + beanName);
-
- // close the connection consumer
- try {
- if (connectionConsumer != null) {
- connectionConsumer.close();
- }
- } catch (Exception e) {
- log.error("Could not close consumer", e);
+ Context context = new InitialContext();
+ try
+ {
+ log.debug("looking up provider adapter: " + providerAdapterJNDI);
+ return (JMSProviderAdapter)context.lookup(providerAdapterJNDI);
}
-
- // clear the server session pool (if it is clearable)
- try {
- if (pool instanceof StdServerSessionPool) {
- StdServerSessionPool p = (StdServerSessionPool)pool;
- p.clear();
- }
- } catch (Exception e) {
- log.error("Could not clear ServerSessionPool", e);
+ finally
+ {
+ context.close();
}
-
- // close the connection
- if (connection != null) {
- try {
- connection.close();
- } catch (Exception e) {
- log.error("Could not close connection", e);
- }
+ }
+
+ /**
+ * Create and or lookup a JMS destination.
+ *
+ * @param type Either javax.jms.Queue or
+ * javax.jms.Topic.
+ * @param ctx The naming context to lookup
+ * destinations from.
+ * @param jndiName The name to use when looking up
+ * destinations.
+ * @param jndiSuffix The name to use when creating
+ * destinations.
+ * @return The destination.
+ * @throws IllegalArgumentException Type is not Queue or Topic.
+ * @exception Exception Description of Exception
+ */
+ protected Destination createDestination(final Class type,
+ final Context ctx,
+ final String jndiName,
+ final String jndiSuffix)
+ throws Exception
+ {
+ try
+ {
+ // first try to look it up
+ return (Destination)ctx.lookup(jndiName);
}
+ catch (NamingException e)
+ {
+ // if the lookup failes, the try to create it
+ log.warn("destination not found: " + jndiName + " reason: " + e);
+ log.warn("creating a new temporary destination: " + jndiName);
+ //
+ // attempt to create the destination (note, this is very
+ // very, very unportable).
+ //
+ MBeanServer server = (MBeanServer)
+ MBeanServerFactory.findMBeanServer(null).iterator().next();
+
+ String methodName;
+ if (type == Topic.class)
+ {
+ methodName = "createTopic";
+ }
+ else if (type == Queue.class)
+ {
+ methodName = "createQueue";
+ }
+ else
+ {
+ // type was not a Topic or Queue, bad user
+ throw new IllegalArgumentException
+ ("expected javax.jms.Queue or javax.jms.Topic: " + type);
+ }
+
+ // invoke the server to create the destination
+ server.invoke(new ObjectName("JBossMQ", "service", "Server"),
+ methodName,
+ new Object[] {jndiSuffix},
+ new String[] {"java.lang.String"});
+
+ // try to look it up again
+ return (Destination)ctx.lookup(jndiName);
+ }
}
-
+
/**
- * XmlLoadable implementation
+ * Create a server session pool for the given connection.
+ *
+ * @param connection The connection to use.
+ * @param maxSession The maximum number of sessions.
+ * @param isTransacted True if the sessions are transacted.
+ * @param ack The session acknowledgement mode.
+ * @param listener The message listener.
+ * @return A server session pool.
+ * @throws JMSException
+ * @exception NamingException Description of Exception
*/
- public void importXml(Element element) throws DeploymentException
+ protected ServerSessionPool
+ createSessionPool(final Connection connection,
+ final int maxSession,
+ final boolean isTransacted,
+ final int ack,
+ final MessageListener listener)
+ throws NamingException, JMSException
{
- try {
- String maxMessages = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "MaxMessages"));
- maxMessagesNr = Integer.parseInt(maxMessages);
-
- String maxSize = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "MaximumSize"));
- maxPoolSize = Integer.parseInt(maxSize);
- } catch (NumberFormatException e) {
- //Noop will take default value
- } catch (DeploymentException e) {
- //Noop will take default value
+ ServerSessionPool pool;
+ Context context = new InitialContext();
+
+ try
+ {
+ // first lookup the factory
+ log.debug("looking up session pool factory: " +
+ serverSessionPoolFactoryJNDI);
+ ServerSessionPoolFactory factory = (ServerSessionPoolFactory)
+ context.lookup(serverSessionPoolFactoryJNDI);
+
+ // the create the pool
+ pool = factory.getServerSessionPool
+ (connection, maxSession, isTransacted, ack, !isContainerManagedTx ||
isNotSupportedTx, listener);
}
-
- // If these are not found we will get a DeploymentException, I hope
- providerAdapterJNDI = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "JMSProviderAdapterJNDI"));
-
- serverSessionPoolFactoryJNDI = MetaData.getElementContent
- (MetaData.getUniqueChild(element, "ServerSessionPoolFactoryJNDI"));
-
- // Check java:/ prefix
- if (!providerAdapterJNDI.startsWith("java:/"))
- providerAdapterJNDI = "java:/"+providerAdapterJNDI;
-
- if (!serverSessionPoolFactoryJNDI.startsWith("java:/"))
- serverSessionPoolFactoryJNDI = "java:/"+serverSessionPoolFactoryJNDI;
+ finally
+ {
+ context.close();
+ }
+
+ return pool;
}
-
+
+ /**
+ * Stop done from inside, we should not stop the exceptionListener in inner
+ * stop.
+ */
+ protected void innerStop()
+ {
+ try
+ {
+ if (connection != null)
+ {
+ connection.setExceptionListener(null);
+ log.debug("unset exception listener");
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Could not set ExceptionListener to null", e);
+ }
+
+ // Stop the connection
+ try
+ {
+ if (connection != null)
+ {
+ connection.stop();
+ log.debug("connection stopped");
+ }
+ }
+ catch (Exception e)
+ {
+ log.error("Could not stop JMS connection", e);
+ }
+ }
+
+ /**
+ * Parse the JNDI suffix from the given JNDI name.
+ *
+ * @param jndiname The JNDI name used to lookup the destination.
+ * @param defautSuffix Description of Parameter
+ * @return The parsed suffix or the defaultSuffix
+ */
+ protected String parseJndiSuffix(final String jndiname,
+ final String defautSuffix)
+ {
+ // jndiSuffix is merely the name that the user has given the MDB.
+ // since the jndi name contains the message type I have to split
+ // at the "/" if there is no slash then I use the entire jndi name...
+ String jndiSuffix = "";
+ if (jndiname != null)
+ {
+ int indexOfSlash = jndiname.indexOf("/");
+ if (indexOfSlash != -1)
+ {
+ jndiSuffix = jndiname.substring(indexOfSlash + 1);
+ }
+ else
+ {
+ jndiSuffix = jndiname;
+ }
+ }
+ else
+ {
+ // if the jndi name from jboss.xml is null then lets use the ejbName
+ jndiSuffix = defautSuffix;
+ }
+
+ return jndiSuffix;
+ }
+
// Package protected ---------------------------------------------
-
+
// Protected -----------------------------------------------------
-
+
// Private -------------------------------------------------------
-
+
// Inner classes -------------------------------------------------
-
+
/**
- * An implementation of MessageListener that passes messages on
- * to the container invoker.
+ * An implementation of MessageListener that passes messages on to the
+ * container invoker.
*/
class MessageListenerImpl
implements MessageListener
{
- /** The container invoker. */
- JMSContainerInvoker invoker; // = null;
-
+ /**
+ * The container invoker.
+ */
+ JMSContainerInvoker invoker;
+ // = null;
+
/**
- * Construct a <tt>MessageListenerImpl</tt>.
+ * Construct a <tt>MessageListenerImpl</tt> .
*
- * @param invoker The container invoker. Must not be null.
+ * @param invoker The container invoker. Must not be null.
*/
- MessageListenerImpl(final JMSContainerInvoker invoker) {
- // assert invoker != null;
-
- this.invoker = invoker;
+ MessageListenerImpl(final JMSContainerInvoker invoker)
+ {
+ // assert invoker != null;
+
+ this.invoker = invoker;
}
-
+
/**
* Process a message.
*
- * @param message The message to process.
+ * @param message The message to process.
*/
public void onMessage(final Message message)
{
- // assert message != null;
-
- if (log.isDebugEnabled()) {
- log.debug("processing message: " + message);
- }
-
- Object id;
- try {
- id = message.getJMSMessageID();
- } catch (JMSException e) {
- // what ?
- id = "JMSContainerInvoker";
- }
-
- // Invoke, shuld we catch any Exceptions??
- try {
- invoker.invoke(id, // Object id - where used?
- ON_MESSAGE, // Method to invoke
- new Object[] {message}, // argument
- tm.getTransaction(), // Transaction
- null, // Principal
- null); // Cred
- }
- catch (Exception e) {
- log.error("Exception in JMSCI message listener", e);
- }
+ // assert message != null;
+
+ if (log.isTraceEnabled())
+ {
+ log.trace("processing message: " + message);
+ }
+
+ Object id;
+ try
+ {
+ id = message.getJMSMessageID();
+ }
+ catch (JMSException e)
+ {
+ // what ?
+ id = "JMSContainerInvoker";
+ }
+
+ // Invoke, shuld we catch any Exceptions??
+ try
+ {
+ // DLQHandling
+ if (useDLQ && // Is Dead Letter Queue used at all
+ message.getJMSRedelivered() && // Was message resent
+ dlqHandler.handleRedeliveredMessage(message)) //Did the DLQ handler
take care of the message
+ {
+ // Message will be placed on Dead Letter Queue,
+ // if redelivered to many times
+ return;
+ }
+
+ invoker.invoke(id,
+ // Object id - where used?
+ ON_MESSAGE,
+ // Method to invoke
+ new Object[]
+ {message},
+ // argument
+ tm.getTransaction(),
+ // Transaction
+ null,
+ // Principal
+ null);
+ // Cred
+ }
+ catch (Exception e)
+ {
+ log.error("Exception in JMSCI message listener", e);
+ }
}
}
-
+
/**
* ExceptionListener for failover handling.
*/
class ExceptionListenerImpl
implements ExceptionListener
{
- JMSContainerInvoker invoker; // = null;
- Thread currentThread; // = null;
+ JMSContainerInvoker invoker;
+ // = null;
+ Thread currentThread;
+ // = null;
boolean notStoped = true;
-
- ExceptionListenerImpl(final JMSContainerInvoker invoker) {
- this.invoker = invoker;
+
+ ExceptionListenerImpl(final JMSContainerInvoker invoker)
+ {
+ this.invoker = invoker;
}
-
- void stop() {
- log.debug("stop requested");
+
+ /**
+ * #Description of the Method
+ *
+ * @param ex Description of Parameter
+ */
+ public void onException(JMSException ex)
+ {
+ currentThread = Thread.currentThread();
+
+ log.warn("MDB lost connection to provider", ex);
+ boolean tryIt = true;
+ while (tryIt && notStoped)
+ {
+ log.info("MDB Trying to reconnect...");
+ try
+ {
+ try
+ {
+ Thread.sleep(reconnectInterval);
+ }
+ catch (InterruptedException ie)
+ {
+ tryIt = false;
+ return;
+ }
+
+ // Reboot container
+ invoker.innerStop();
+ invoker.destroy();
+ invoker.init();
+ invoker.start();
+ tryIt = false;
+ log.info("OK - reconnected");
+ }
+ catch (Exception e)
+ {
+ log.error("MDB error reconnecting", e);
+ }
+ }
+ currentThread = null;
+ }
- notStoped = false;
- if (currentThread != null) {
- currentThread.interrupt();
- log.debug("current thread interrupted");
- }
- }
-
- public void onException(JMSException ex) {
- currentThread = Thread.currentThread();
-
- log.warn("MDB lost connection to provider", ex);
- boolean tryIt = true;
- while(tryIt && notStoped) {
- log.info("MDB Trying to reconnect...");
- try {
- try {
- Thread.sleep(10000);
- } catch (InterruptedException ie) {
- tryIt=false; return;
- }
-
- // Reboot container
- invoker.innerStop();
- invoker.destroy();
- invoker.init();
- invoker.start();
- tryIt = false;
- log.info("OK - reconnected");
- }
- catch (Exception e) {
- log.error("MDB error reconnecting", e);
- }
- }
- currentThread = null;
+ void stop()
+ {
+ log.debug("stop requested");
+
+ notStoped = false;
+ if (currentThread != null)
+ {
+ currentThread.interrupt();
+ log.debug("current thread interrupted");
+ }
+ }
+ }
+
+ /**
+ * Return a string representation of the current config state.
+ */
+ public String toString()
+ {
+ StringBuffer buff = new StringBuffer();
+ buff.append("JMSContainerInvoker: {");
+ buff.append("beanName=").append(beanName);
+ buff.append(";maxMessagesNr=").append(maxMessagesNr);
+ buff.append(";maxPoolSize=").append(maxPoolSize);
+ buff.append(";reconnectInterval=").append(reconnectInterval);
+ buff.append(";providerAdapterJNDI=").append(providerAdapterJNDI);
+
buff.append(";serverSessionPoolFactoryJNDI=").append(serverSessionPoolFactoryJNDI);
+ buff.append(";acknowledgeMode=").append(acknowledgeMode);
+ buff.append(";isContainerManagedTx=").append(isContainerManagedTx);
+ buff.append(";isNotSupportedTx=").append(isNotSupportedTx);
+ buff.append(";useDLQ=").append(useDLQ);
+ if (dlqHandler != null)
+ buff.append(";dlqHandler=").append(dlqHandler.toString());
+ buff.append("}");
+ return buff.toString();
+ }
+
+ /**
+ * Initialize the ON_MESSAGE reference.
+ */
+ static
+ {
+ try
+ {
+ final Class type = MessageListener.class;
+ final Class arg = Message.class;
+ ON_MESSAGE = type.getMethod("onMessage", new Class[]
+ {arg});
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw new ExceptionInInitializerError(e);
}
}
}
_______________________________________________
Jboss-development mailing list
[EMAIL PROTECTED]
https://lists.sourceforge.net/lists/listinfo/jboss-development