http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASession.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASession.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASession.java new file mode 100644 index 0000000..d1cad59 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASession.java @@ -0,0 +1,1903 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.IllegalStateException; +import javax.jms.InvalidDestinationException; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.jms.TransactionInProgressException; +import javax.jms.XAQueueSession; +import javax.jms.XATopicSession; +import javax.resource.ResourceException; +import javax.resource.spi.ConnectionEvent; +import javax.resource.spi.ManagedConnection; +import javax.transaction.xa.XAResource; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal; +import org.apache.activemq.jms.client.ActiveMQSession; + + +/** + * A joint interface for JMS sessions + * + * @author <a href="mailto:adr...@jboss.com">Adrian Brock</a> + * @author <a href="mailto:jesper.peder...@jboss.org">Jesper Pedersen</a> + * @author <a href="mailto:mtay...@redhat.com">Martyn Taylor</a> + */ +public final class ActiveMQRASession implements QueueSession, TopicSession, XAQueueSession, XATopicSession +{ + /** + * Trace enabled + */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * The managed connection + */ + private ActiveMQRAManagedConnection mc; + + /** + * The connection request info + */ + private final ActiveMQRAConnectionRequestInfo cri; + + /** + * The session factory + */ + private ActiveMQRASessionFactory sf; + + /** + * The message consumers + */ + private final Set<MessageConsumer> consumers; + + /** + * The message producers + */ + private final Set<MessageProducer> producers; + + /** + * Constructor + * + * @param mc The managed connection + * @param cri The connection request info + */ + public ActiveMQRASession(final ActiveMQRAManagedConnection mc, final ActiveMQRAConnectionRequestInfo cri) + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + mc + ", " + cri + ")"); + } + + this.mc = mc; + this.cri = cri; + sf = null; + consumers = new HashSet<MessageConsumer>(); + producers = new HashSet<MessageProducer>(); + } + + /** + * Set the session factory + * + * @param sf The session factory + */ + public void setActiveMQSessionFactory(final ActiveMQRASessionFactory sf) + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("setActiveMQSessionFactory(" + sf + ")"); + } + + this.sf = sf; + } + + /** + * Lock + * + * @throws JMSException Thrown if an error occurs + * @throws IllegalStateException The session is closed + */ + protected void lock() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("lock()"); + } + + final ActiveMQRAManagedConnection mcLocal = this.mc; + if (mcLocal != null) + { + mcLocal.tryLock(); + } + else + { + throw new IllegalStateException("Connection is not associated with a managed connection. " + this); + } + } + + /** + * Unlock + */ + protected void unlock() + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("unlock()"); + } + + final ActiveMQRAManagedConnection mcLocal = this.mc; + if (mcLocal != null) + { + mcLocal.unlock(); + } + + // We recreate the lock when returned to the pool + // so missing the unlock after disassociation is not important + } + + /** + * Create a bytes message + * + * @return The message + * @throws JMSException Thrown if an error occurs + */ + public BytesMessage createBytesMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createBytesMessage" + session); + } + + return session.createBytesMessage(); + } + + /** + * Create a map message + * + * @return The message + * @throws JMSException Thrown if an error occurs + */ + public MapMessage createMapMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createMapMessage(), " + session); + } + + return session.createMapMessage(); + } + + /** + * Create a message + * + * @return The message + * @throws JMSException Thrown if an error occurs + */ + public Message createMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createMessage" + session); + } + + return session.createMessage(); + } + + /** + * Create an object message + * + * @return The message + * @throws JMSException Thrown if an error occurs + */ + public ObjectMessage createObjectMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createObjectMessage" + session); + } + + return session.createObjectMessage(); + } + + /** + * Create an object message + * + * @param object The object + * @return The message + * @throws JMSException Thrown if an error occurs + */ + public ObjectMessage createObjectMessage(final Serializable object) throws JMSException + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createObjectMessage(" + object + ")" + session); + } + + return session.createObjectMessage(object); + } + + /** + * Create a stream message + * + * @return The message + * @throws JMSException Thrown if an error occurs + */ + public StreamMessage createStreamMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createStreamMessage" + session); + } + + return session.createStreamMessage(); + } + + /** + * Create a text message + * + * @return The message + * @throws JMSException Thrown if an error occurs + */ + public TextMessage createTextMessage() throws JMSException + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createTextMessage" + session); + } + + return session.createTextMessage(); + } + + /** + * Create a text message + * + * @param string The text + * @return The message + * @throws JMSException Thrown if an error occurs + */ + public TextMessage createTextMessage(final String string) throws JMSException + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createTextMessage(" + string + ")" + session); + } + + return session.createTextMessage(string); + } + + /** + * Get transacted + * + * @return True if transacted; otherwise false + * @throws JMSException Thrown if an error occurs + */ + public boolean getTransacted() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getTransacted()"); + } + + getSessionInternal(); + return cri.isTransacted(); + } + + /** + * Get the message listener -- throws IllegalStateException + * + * @return The message listener + * @throws JMSException Thrown if an error occurs + */ + public MessageListener getMessageListener() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getMessageListener()"); + } + + throw new IllegalStateException("Method not allowed"); + } + + /** + * Set the message listener -- Throws IllegalStateException + * + * @param listener The message listener + * @throws JMSException Thrown if an error occurs + */ + public void setMessageListener(final MessageListener listener) throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("setMessageListener(" + listener + ")"); + } + + throw new IllegalStateException("Method not allowed"); + } + + /** + * Always throws an Error. + * + * @throws Error Method not allowed. + */ + public void run() + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("run()"); + } + + throw new Error("Method not allowed"); + } + + /** + * Closes the session. Sends a ConnectionEvent.CONNECTION_CLOSED to the + * managed connection. + * + * @throws JMSException Failed to close session. + */ + public void close() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("close()"); + } + + sf.closeSession(this); + closeSession(); + } + + /** + * Commit + * + * @throws JMSException Failed to close session. + */ + public void commit() throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.XA_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION || + cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new TransactionInProgressException("XA connection"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (cri.isTransacted() == false) + { + throw new IllegalStateException("Session is not transacted"); + } + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("Commit session " + this); + } + + session.commit(); + } + finally + { + unlock(); + } + } + + /** + * Rollback + * + * @throws JMSException Failed to close session. + */ + public void rollback() throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.XA_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION || + cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new TransactionInProgressException("XA connection"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (cri.isTransacted() == false) + { + throw new IllegalStateException("Session is not transacted"); + } + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("Rollback session " + this); + } + + session.rollback(); + } + finally + { + unlock(); + } + } + + /** + * Recover + * + * @throws JMSException Failed to close session. + */ + public void recover() throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (cri.isTransacted()) + { + throw new IllegalStateException("Session is transacted"); + } + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("Recover session " + this); + } + + session.recover(); + } + finally + { + unlock(); + } + } + + /** + * Create a topic + * + * @param topicName The topic name + * @return The topic + * @throws JMSException Thrown if an error occurs + */ + public Topic createTopic(final String topicName) throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot create topic for javax.jms.QueueSession"); + } + + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createTopic " + session + " topicName=" + topicName); + } + + Topic result = session.createTopic(topicName); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdTopic " + session + " topic=" + result); + } + + return result; + } + + /** + * Create a topic subscriber + * + * @param topic The topic + * @return The subscriber + * @throws JMSException Thrown if an error occurs + */ + public TopicSubscriber createSubscriber(final Topic topic) throws JMSException + { + lock(); + try + { + TopicSession session = getTopicSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createSubscriber " + session + " topic=" + topic); + } + + TopicSubscriber result = session.createSubscriber(topic); + result = new ActiveMQRATopicSubscriber(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdSubscriber " + session + " ActiveMQTopicSubscriber=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a topic subscriber + * + * @param topic The topic + * @param messageSelector The message selector + * @param noLocal If true inhibits the delivery of messages published by its own connection + * @return The subscriber + * @throws JMSException Thrown if an error occurs + */ + public TopicSubscriber createSubscriber(final Topic topic, final String messageSelector, final boolean noLocal) throws JMSException + { + lock(); + try + { + TopicSession session = getTopicSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createSubscriber " + session + + " topic=" + + topic + + " selector=" + + messageSelector + + " noLocal=" + + noLocal); + } + + TopicSubscriber result = session.createSubscriber(topic, messageSelector, noLocal); + result = new ActiveMQRATopicSubscriber(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdSubscriber " + session + " ActiveMQTopicSubscriber=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a durable topic subscriber + * + * @param topic The topic + * @param name The name + * @return The subscriber + * @throws JMSException Thrown if an error occurs + */ + public TopicSubscriber createDurableSubscriber(final Topic topic, final String name) throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot create durable subscriber from javax.jms.QueueSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createDurableSubscriber " + session + " topic=" + topic + " name=" + name); + } + + TopicSubscriber result = session.createDurableSubscriber(topic, name); + result = new ActiveMQRATopicSubscriber(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdDurableSubscriber " + session + " ActiveMQTopicSubscriber=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a topic subscriber + * + * @param topic The topic + * @param name The name + * @param messageSelector The message selector + * @param noLocal If true inhibits the delivery of messages published by its own connection + * @return The subscriber + * @throws JMSException Thrown if an error occurs + */ + public TopicSubscriber createDurableSubscriber(final Topic topic, + final String name, + final String messageSelector, + final boolean noLocal) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createDurableSubscriber " + session + + " topic=" + + topic + + " name=" + + name + + " selector=" + + messageSelector + + " noLocal=" + + noLocal); + } + + TopicSubscriber result = session.createDurableSubscriber(topic, name, messageSelector, noLocal); + result = new ActiveMQRATopicSubscriber(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdDurableSubscriber " + session + " ActiveMQTopicSubscriber=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a topic publisher + * + * @param topic The topic + * @return The publisher + * @throws JMSException Thrown if an error occurs + */ + public TopicPublisher createPublisher(final Topic topic) throws JMSException + { + lock(); + try + { + TopicSession session = getTopicSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createPublisher " + session + " topic=" + topic); + } + + TopicPublisher result = session.createPublisher(topic); + result = new ActiveMQRATopicPublisher(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdPublisher " + session + " publisher=" + result); + } + + addProducer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a temporary topic + * + * @return The temporary topic + * @throws JMSException Thrown if an error occurs + */ + public TemporaryTopic createTemporaryTopic() throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot create temporary topic for javax.jms.QueueSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createTemporaryTopic " + session); + } + + TemporaryTopic temp = session.createTemporaryTopic(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdTemporaryTopic " + session + " temp=" + temp); + } + + sf.addTemporaryTopic(temp); + + return temp; + } + finally + { + unlock(); + } + } + + /** + * Unsubscribe + * + * @param name The name + * @throws JMSException Thrown if an error occurs + */ + public void unsubscribe(final String name) throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Cannot unsubscribe for javax.jms.QueueSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("unsubscribe " + session + " name=" + name); + } + + session.unsubscribe(name); + } + finally + { + unlock(); + } + } + + /** + * Create a browser + * + * @param queue The queue + * @return The browser + * @throws JMSException Thrown if an error occurs + */ + public QueueBrowser createBrowser(final Queue queue) throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession"); + } + + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createBrowser " + session + " queue=" + queue); + } + + QueueBrowser result = session.createBrowser(queue); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdBrowser " + session + " browser=" + result); + } + + return result; + } + + /** + * Create a browser + * + * @param queue The queue + * @param messageSelector The message selector + * @return The browser + * @throws JMSException Thrown if an error occurs + */ + public QueueBrowser createBrowser(final Queue queue, final String messageSelector) throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Cannot create browser for javax.jms.TopicSession"); + } + + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createBrowser " + session + " queue=" + queue + " selector=" + messageSelector); + } + + QueueBrowser result = session.createBrowser(queue, messageSelector); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdBrowser " + session + " browser=" + result); + } + + return result; + } + + /** + * Create a queue + * + * @param queueName The queue name + * @return The queue + * @throws JMSException Thrown if an error occurs + */ + public Queue createQueue(final String queueName) throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Cannot create browser or javax.jms.TopicSession"); + } + + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createQueue " + session + " queueName=" + queueName); + } + + Queue result = session.createQueue(queueName); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdQueue " + session + " queue=" + result); + } + + return result; + } + + /** + * Create a queue receiver + * + * @param queue The queue + * @return The queue receiver + * @throws JMSException Thrown if an error occurs + */ + public QueueReceiver createReceiver(final Queue queue) throws JMSException + { + lock(); + try + { + QueueSession session = getQueueSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " queue=" + queue); + } + + QueueReceiver result = session.createReceiver(queue); + result = new ActiveMQRAQueueReceiver(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " receiver=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a queue receiver + * + * @param queue The queue + * @param messageSelector + * @return The queue receiver + * @throws JMSException Thrown if an error occurs + */ + public QueueReceiver createReceiver(final Queue queue, final String messageSelector) throws JMSException + { + lock(); + try + { + QueueSession session = getQueueSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createReceiver " + session + " queue=" + queue + " selector=" + messageSelector); + } + + QueueReceiver result = session.createReceiver(queue, messageSelector); + result = new ActiveMQRAQueueReceiver(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdReceiver " + session + " receiver=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a queue sender + * + * @param queue The queue + * @return The queue sender + * @throws JMSException Thrown if an error occurs + */ + public QueueSender createSender(final Queue queue) throws JMSException + { + lock(); + try + { + QueueSession session = getQueueSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createSender " + session + " queue=" + queue); + } + + QueueSender result = session.createSender(queue); + result = new ActiveMQRAQueueSender(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdSender " + session + " sender=" + result); + } + + addProducer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a temporary queue + * + * @return The temporary queue + * @throws JMSException Thrown if an error occurs + */ + public TemporaryQueue createTemporaryQueue() throws JMSException + { + if (cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Cannot create temporary queue for javax.jms.TopicSession"); + } + + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createTemporaryQueue " + session); + } + + TemporaryQueue temp = session.createTemporaryQueue(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdTemporaryQueue " + session + " temp=" + temp); + } + + sf.addTemporaryQueue(temp); + + return temp; + } + finally + { + unlock(); + } + } + + /** + * Create a message consumer + * + * @param destination The destination + * @return The message consumer + * @throws JMSException Thrown if an error occurs + */ + public MessageConsumer createConsumer(final Destination destination) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createConsumer " + session + " dest=" + destination); + } + + MessageConsumer result = session.createConsumer(destination); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a message consumer + * + * @param destination The destination + * @param messageSelector The message selector + * @return The message consumer + * @throws JMSException Thrown if an error occurs + */ + public MessageConsumer createConsumer(final Destination destination, final String messageSelector) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createConsumer " + session + + " dest=" + + destination + + " messageSelector=" + + messageSelector); + } + + MessageConsumer result = session.createConsumer(destination, messageSelector); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a message consumer + * + * @param destination The destination + * @param messageSelector The message selector + * @param noLocal If true inhibits the delivery of messages published by its own connection + * @return The message consumer + * @throws JMSException Thrown if an error occurs + */ + public MessageConsumer createConsumer(final Destination destination, + final String messageSelector, + final boolean noLocal) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createConsumer " + session + + " dest=" + + destination + + " messageSelector=" + + messageSelector + + " noLocal=" + + noLocal); + } + + MessageConsumer result = session.createConsumer(destination, messageSelector, noLocal); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Create a message producer + * + * @param destination The destination + * @return The message producer + * @throws JMSException Thrown if an error occurs + */ + public MessageProducer createProducer(final Destination destination) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createProducer " + session + " dest=" + destination); + } + + MessageProducer result = session.createProducer(destination); + result = new ActiveMQRAMessageProducer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdProducer " + session + " producer=" + result); + } + + addProducer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Get the acknowledge mode + * + * @return The mode + * @throws JMSException Thrown if an error occurs + */ + public int getAcknowledgeMode() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getAcknowledgeMode()"); + } + + getSessionInternal(); + return cri.getAcknowledgeMode(); + } + + /** + * Get the XA resource + * + * @return The XA resource + * @throws IllegalStateException If non XA connection + */ + public XAResource getXAResource() + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getXAResource()"); + } + + if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || + cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION) + { + return null; + } + + try + { + lock(); + + return getXAResourceInternal(); + } + catch (Throwable t) + { + return null; + } + finally + { + unlock(); + } + } + + /** + * Returns the ID of the Node that this session is associated with. + * + * @return Node ID + */ + public String getNodeId() throws JMSException + { + ActiveMQSession session = (ActiveMQSession) getSessionInternal(); + ClientSessionFactoryInternal factory = (ClientSessionFactoryInternal) session.getCoreSession().getSessionFactory(); + return factory.getLiveNodeId(); + } + + /** + * Get the session + * + * @return The session + * @throws JMSException Thrown if an error occurs + */ + public Session getSession() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getNonXAsession()"); + } + + if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || + cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION) + { + throw new IllegalStateException("Non XA connection"); + } + + lock(); + try + { + return this; + } + finally + { + unlock(); + } + } + + /** + * Get the queue session + * + * @return The queue session + * @throws JMSException Thrown if an error occurs + */ + public QueueSession getQueueSession() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getQueueSession()"); + } + + if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || + cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION) + { + throw new IllegalStateException("Non XA connection"); + } + + lock(); + try + { + return this; + } + finally + { + unlock(); + } + } + + /** + * Get the topic session + * + * @return The topic session + * @throws JMSException Thrown if an error occurs + */ + public TopicSession getTopicSession() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getTopicSession()"); + } + + if (cri.getType() == ActiveMQRAConnectionFactory.CONNECTION || cri.getType() == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || + cri.getType() == ActiveMQRAConnectionFactory.TOPIC_CONNECTION) + { + throw new IllegalStateException("Non XA connection"); + } + + lock(); + try + { + return this; + } + finally + { + unlock(); + } + } + + @Override + public MessageConsumer createSharedConsumer(final Topic topic, final String sharedSubscriptionName) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createSharedConsumer " + session + " topic=" + topic + ", sharedSubscriptionName=" + sharedSubscriptionName); + } + + MessageConsumer result = session.createSharedConsumer(topic, sharedSubscriptionName); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + @Override + public MessageConsumer createSharedConsumer(final Topic topic, final String sharedSubscriptionName, + final String messageSelector) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createSharedConsumer " + session + " topic=" + topic + + ", sharedSubscriptionName=" + sharedSubscriptionName + ", messageSelector=" + messageSelector); + } + + MessageConsumer result = session.createSharedConsumer(topic, sharedSubscriptionName, messageSelector); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + @Override + public MessageConsumer createDurableConsumer(final Topic topic, final String name) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createSharedConsumer " + session + " topic=" + topic + ", name=" + name); + } + + MessageConsumer result = session.createDurableConsumer(topic, name); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + @Override + public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createDurableConsumer " + session + " topic=" + topic + ", name=" + name + + ", messageSelector=" + messageSelector + ", noLocal=" + noLocal); + } + + MessageConsumer result = session.createDurableConsumer(topic, name, messageSelector, noLocal); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createSharedDurableConsumer " + session + " topic=" + topic + ", name=" + + name); + } + + MessageConsumer result = session.createSharedDurableConsumer(topic, name); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + @Override + public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector) throws JMSException + { + lock(); + try + { + Session session = getSessionInternal(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createSharedDurableConsumer " + session + " topic=" + topic + ", name=" + + name + ", messageSelector=" + messageSelector); + } + + MessageConsumer result = session.createSharedDurableConsumer(topic, name, messageSelector); + result = new ActiveMQRAMessageConsumer(result, this); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("createdConsumer " + session + " consumer=" + result); + } + + addConsumer(result); + + return result; + } + finally + { + unlock(); + } + } + + /** + * Set the managed connection + * + * @param managedConnection The managed connection + */ + void setManagedConnection(final ActiveMQRAManagedConnection managedConnection) + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("setManagedConnection(" + managedConnection + ")"); + } + + if (mc != null) + { + mc.removeHandle(this); + } + + mc = managedConnection; + } + + /** + * for tests only + */ + public ManagedConnection getManagedConnection() + { + return mc; + } + + /** + * Destroy + */ + void destroy() + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("destroy()"); + } + + mc = null; + } + + /** + * Start + * + * @throws JMSException Thrown if an error occurs + */ + void start() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("start()"); + } + + if (mc != null) + { + mc.start(); + } + } + + /** + * Stop + * + * @throws JMSException Thrown if an error occurs + */ + void stop() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("stop()"); + } + + if (mc != null) + { + mc.stop(); + } + } + + /** + * Check strict + * + * @throws JMSException Thrown if an error occurs + */ + void checkStrict() throws JMSException + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("checkStrict()"); + } + + if (mc != null) + { + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + } + + /** + * Close session + * + * @throws JMSException Thrown if an error occurs + */ + void closeSession() throws JMSException + { + if (mc != null) + { + ActiveMQRALogger.LOGGER.trace("Closing session"); + + try + { + mc.stop(); + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.trace("Error stopping managed connection", t); + } + + synchronized (consumers) + { + for (Iterator<MessageConsumer> i = consumers.iterator(); i.hasNext(); ) + { + ActiveMQRAMessageConsumer consumer = (ActiveMQRAMessageConsumer) i.next(); + try + { + consumer.closeConsumer(); + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.trace("Error closing consumer", t); + } + i.remove(); + } + } + + synchronized (producers) + { + for (Iterator<MessageProducer> i = producers.iterator(); i.hasNext(); ) + { + ActiveMQRAMessageProducer producer = (ActiveMQRAMessageProducer) i.next(); + try + { + producer.closeProducer(); + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.trace("Error closing producer", t); + } + i.remove(); + } + } + + mc.removeHandle(this); + ConnectionEvent ev = new ConnectionEvent(mc, ConnectionEvent.CONNECTION_CLOSED); + ev.setConnectionHandle(this); + mc.sendEvent(ev); + mc = null; + } + } + + /** + * Add consumer + * + * @param consumer The consumer + */ + void addConsumer(final MessageConsumer consumer) + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("addConsumer(" + consumer + ")"); + } + + synchronized (consumers) + { + consumers.add(consumer); + } + } + + /** + * Remove consumer + * + * @param consumer The consumer + */ + void removeConsumer(final MessageConsumer consumer) + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("removeConsumer(" + consumer + ")"); + } + + synchronized (consumers) + { + consumers.remove(consumer); + } + } + + /** + * Add producer + * + * @param producer The producer + */ + void addProducer(final MessageProducer producer) + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("addProducer(" + producer + ")"); + } + + synchronized (producers) + { + producers.add(producer); + } + } + + /** + * Remove producer + * + * @param producer The producer + */ + void removeProducer(final MessageProducer producer) + { + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("removeProducer(" + producer + ")"); + } + + synchronized (producers) + { + producers.remove(producer); + } + } + + /** + * Get the session and ensure that it is open + * + * @return The session + * @throws JMSException Thrown if an error occurs + * @throws IllegalStateException The session is closed + */ + Session getSessionInternal() throws JMSException + { + if (mc == null) + { + throw new IllegalStateException("The session is closed"); + } + + Session session = mc.getSession(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getSessionInternal " + session + " for " + this); + } + + return session; + } + + /** + * Get the XA resource and ensure that it is open + * + * @return The XA Resource + * @throws JMSException Thrown if an error occurs + * @throws IllegalStateException The session is closed + */ + XAResource getXAResourceInternal() throws JMSException + { + if (mc == null) + { + throw new IllegalStateException("The session is closed"); + } + + try + { + XAResource xares = mc.getXAResource(); + + if (ActiveMQRASession.trace) + { + ActiveMQRALogger.LOGGER.trace("getXAResourceInternal " + xares + " for " + this); + } + + return xares; + } + catch (ResourceException e) + { + JMSException jmse = new JMSException("Unable to get XA Resource"); + jmse.initCause(e); + throw jmse; + } + } + + /** + * Get the queue session + * + * @return The queue session + * @throws JMSException Thrown if an error occurs + * @throws IllegalStateException The session is closed + */ + QueueSession getQueueSessionInternal() throws JMSException + { + Session s = getSessionInternal(); + if (!(s instanceof QueueSession)) + { + throw new InvalidDestinationException("Attempting to use QueueSession methods on: " + this); + } + return (QueueSession) s; + } + + /** + * Get the topic session + * + * @return The topic session + * @throws JMSException Thrown if an error occurs + * @throws IllegalStateException The session is closed + */ + TopicSession getTopicSessionInternal() throws JMSException + { + Session s = getSessionInternal(); + if (!(s instanceof TopicSession)) + { + throw new InvalidDestinationException("Attempting to use TopicSession methods on: " + this); + } + return (TopicSession) s; + } + + /** + * @throws SystemException + * @throws RollbackException + */ + public void checkState() throws JMSException + { + if (mc != null) + { + mc.checkTransactionActive(); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactory.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactory.java new file mode 100644 index 0000000..eba16f5 --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.JMSException; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.XAQueueConnection; +import javax.jms.XATopicConnection; + +/** + * A joint interface for all connection types + * + * @author <a href="mailto:adr...@jboss.com">Adrian Brock</a> + * @author <a href="mailto:jesper.peder...@jboss.org">Jesper Pedersen</a> + * @version $Revision: 71554 $ + */ +public interface ActiveMQRASessionFactory extends XATopicConnection, XAQueueConnection +{ + /** Error message for strict behaviour */ + String ISE = "This method is not applicable inside the application server. See the J2EE spec, e.g. J2EE1.4 Section 6.6"; + + /** + * Add a temporary queue + * @param temp The temporary queue + */ + void addTemporaryQueue(TemporaryQueue temp); + + /** + * Add a temporary topic + * @param temp The temporary topic + */ + void addTemporaryTopic(TemporaryTopic temp); + + /** + * Notification that a session is closed + * @param session The session + * @throws JMSException for any error + */ + void closeSession(ActiveMQRASession session) throws JMSException; +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/034adfbf/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactoryImpl.java ---------------------------------------------------------------------- diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactoryImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactoryImpl.java new file mode 100644 index 0000000..ef743ac --- /dev/null +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQRASessionFactoryImpl.java @@ -0,0 +1,1046 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq.ra; + +import javax.jms.ConnectionConsumer; +import javax.jms.ConnectionMetaData; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; +import javax.jms.JMSContext; +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; +import javax.jms.TopicSession; +import javax.jms.XAJMSContext; +import javax.jms.XAQueueSession; +import javax.jms.XASession; +import javax.jms.XATopicSession; +import javax.naming.Reference; +import javax.resource.Referenceable; +import javax.resource.spi.ConnectionManager; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import org.apache.activemq.api.jms.ActiveMQJMSConstants; +import org.apache.activemq.jms.client.ActiveMQConnectionForContext; +import org.apache.activemq.jms.client.ActiveMQConnectionForContextImpl; + +/** + * Implements the JMS Connection API and produces {@link ActiveMQRASession} objects. + * + * @author <a href="mailto:adr...@jboss.com">Adrian Brock</a> + * @author <a href="mailto:jesper.peder...@jboss.org">Jesper Pedersen</a> + */ +public final class ActiveMQRASessionFactoryImpl extends ActiveMQConnectionForContextImpl implements + ActiveMQRASessionFactory, ActiveMQConnectionForContext, Referenceable +{ + /** + * Trace enabled + */ + private static boolean trace = ActiveMQRALogger.LOGGER.isTraceEnabled(); + + /** + * Are we closed? + */ + private boolean closed = false; + + /** + * The naming reference + */ + private Reference reference; + + /** + * The user name + */ + private String userName; + + /** + * The password + */ + private String password; + + /** + * The client ID + */ + private String clientID; + + /** + * The connection type + */ + private final int type; + + /** + * Whether we are started + */ + private boolean started = false; + + /** + * The managed connection factory + */ + private final ActiveMQRAManagedConnectionFactory mcf; + private TransactionManager tm; + + /** + * The connection manager + */ + private ConnectionManager cm; + + /** + * The sessions + */ + private final Set<ActiveMQRASession> sessions = new HashSet<ActiveMQRASession>(); + + /** + * The temporary queues + */ + private final Set<TemporaryQueue> tempQueues = new HashSet<TemporaryQueue>(); + + /** + * The temporary topics + */ + private final Set<TemporaryTopic> tempTopics = new HashSet<TemporaryTopic>(); + + /** + * Constructor + * + * @param mcf The managed connection factory + * @param cm The connection manager + * @param type The connection type + */ + public ActiveMQRASessionFactoryImpl(final ActiveMQRAManagedConnectionFactory mcf, + final ConnectionManager cm, + final TransactionManager tm, + final int type) + { + this.mcf = mcf; + + this.tm = tm; + + if (cm == null) + { + this.cm = new ActiveMQRAConnectionManager(); + } + else + { + this.cm = cm; + } + + this.type = type; + + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("constructor(" + mcf + ", " + cm + ", " + type); + } + } + + public JMSContext createContext(int sessionMode) + { + boolean inJtaTx = inJtaTransaction(); + int sessionModeToUse; + switch (sessionMode) + { + case Session.AUTO_ACKNOWLEDGE: + case Session.DUPS_OK_ACKNOWLEDGE: + case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE: + case ActiveMQJMSConstants.PRE_ACKNOWLEDGE: + sessionModeToUse = sessionMode; + break; + //these are prohibited in JEE unless not in a JTA tx where they should be ignored and auto_ack used + case Session.CLIENT_ACKNOWLEDGE: + if (!inJtaTx) + { + throw ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntime(); + } + sessionModeToUse = Session.AUTO_ACKNOWLEDGE; + break; + case Session.SESSION_TRANSACTED: + if (!inJtaTx) + { + throw ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime(); + } + sessionModeToUse = Session.AUTO_ACKNOWLEDGE; + break; + default: + throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(sessionMode); + } + incrementRefCounter(); + + return new ActiveMQRAJMSContext(this, sessionModeToUse, threadAwareContext); + } + + public XAJMSContext createXAContext() + { + incrementRefCounter(); + + return new ActiveMQRAXAJMSContext(this, threadAwareContext); + } + + /** + * Set the naming reference + * + * @param reference The reference + */ + public void setReference(final Reference reference) + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("setReference(" + reference + ")"); + } + + this.reference = reference; + } + + /** + * Get the naming reference + * + * @return The reference + */ + public Reference getReference() + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("getReference()"); + } + + return reference; + } + + /** + * Set the user name + * + * @param name The user name + */ + public void setUserName(final String name) + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("setUserName(" + name + ")"); + } + + userName = name; + } + + /** + * Set the password + * + * @param password The password + */ + public void setPassword(final String password) + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("setPassword(****)"); + } + + this.password = password; + } + + /** + * Get the client ID + * + * @return The client ID + * @throws JMSException Thrown if an error occurs + */ + @Override + public String getClientID() throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("getClientID()"); + } + + checkClosed(); + + if (clientID == null) + { + return ((ActiveMQResourceAdapter) mcf.getResourceAdapter()).getProperties().getClientID(); + } + + return clientID; + } + + /** + * Set the client ID -- throws IllegalStateException + * + * @param cID The client ID + * @throws JMSException Thrown if an error occurs + */ + @Override + public void setClientID(final String cID) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("setClientID(" + cID + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Create a queue session + * + * @param transacted Use transactions + * @param acknowledgeMode The acknowledge mode + * @return The queue session + * @throws JMSException Thrown if an error occurs + */ + public QueueSession createQueueSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createQueueSession(" + transacted + ", " + acknowledgeMode + ")"); + } + + checkClosed(); + + if (type == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || type == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Can not get a queue session from a topic connection"); + } + + return allocateConnection(transacted, acknowledgeMode, type); + } + + /** + * Create a XA queue session + * + * @return The XA queue session + * @throws JMSException Thrown if an error occurs + */ + public XAQueueSession createXAQueueSession() throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createXAQueueSession()"); + } + + checkClosed(); + + if (type == ActiveMQRAConnectionFactory.CONNECTION || type == ActiveMQRAConnectionFactory.TOPIC_CONNECTION || + type == ActiveMQRAConnectionFactory.XA_TOPIC_CONNECTION) + { + throw new IllegalStateException("Can not get a topic session from a queue connection"); + } + + return allocateConnection(type); + } + + /** + * Create a connection consumer -- throws IllegalStateException + * + * @param queue The queue + * @param messageSelector The message selector + * @param sessionPool The session pool + * @param maxMessages The number of max messages + * @return The connection consumer + * @throws JMSException Thrown if an error occurs + */ + public ConnectionConsumer createConnectionConsumer(final Queue queue, + final String messageSelector, + final ServerSessionPool sessionPool, + final int maxMessages) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + queue + + ", " + + messageSelector + + ", " + + sessionPool + + ", " + + maxMessages + + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Create a topic session + * + * @param transacted Use transactions + * @param acknowledgeMode The acknowledge mode + * @return The topic session + * @throws JMSException Thrown if an error occurs + */ + public TopicSession createTopicSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createTopicSession(" + transacted + ", " + acknowledgeMode + ")"); + } + + checkClosed(); + + if (type == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || type == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Can not get a topic session from a queue connection"); + } + + return allocateConnection(transacted, acknowledgeMode, type); + } + + /** + * Create a XA topic session + * + * @return The XA topic session + * @throws JMSException Thrown if an error occurs + */ + public XATopicSession createXATopicSession() throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createXATopicSession()"); + } + + checkClosed(); + + if (type == ActiveMQRAConnectionFactory.CONNECTION || type == ActiveMQRAConnectionFactory.QUEUE_CONNECTION || + type == ActiveMQRAConnectionFactory.XA_QUEUE_CONNECTION) + { + throw new IllegalStateException("Can not get a topic session from a queue connection"); + } + + return allocateConnection(type); + } + + /** + * Create a connection consumer -- throws IllegalStateException + * + * @param topic The topic + * @param messageSelector The message selector + * @param sessionPool The session pool + * @param maxMessages The number of max messages + * @return The connection consumer + * @throws JMSException Thrown if an error occurs + */ + public ConnectionConsumer createConnectionConsumer(final Topic topic, + final String messageSelector, + final ServerSessionPool sessionPool, + final int maxMessages) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + topic + + ", " + + messageSelector + + ", " + + sessionPool + + ", " + + maxMessages + + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Create a durable connection consumer -- throws IllegalStateException + * + * @param topic The topic + * @param subscriptionName The subscription name + * @param messageSelector The message selector + * @param sessionPool The session pool + * @param maxMessages The number of max messages + * @return The connection consumer + * @throws JMSException Thrown if an error occurs + */ + @Override + public ConnectionConsumer createDurableConnectionConsumer(final Topic topic, + final String subscriptionName, + final String messageSelector, + final ServerSessionPool sessionPool, + final int maxMessages) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + topic + + ", " + + subscriptionName + + ", " + + messageSelector + + ", " + + sessionPool + + ", " + + maxMessages + + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Create a connection consumer -- throws IllegalStateException + * + * @param destination The destination + * @param pool The session pool + * @param maxMessages The number of max messages + * @return The connection consumer + * @throws JMSException Thrown if an error occurs + */ + public ConnectionConsumer createConnectionConsumer(final Destination destination, + final ServerSessionPool pool, + final int maxMessages) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + destination + + ", " + + pool + + ", " + + maxMessages + + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Create a connection consumer -- throws IllegalStateException + * + * @param destination The destination + * @param name The name + * @param pool The session pool + * @param maxMessages The number of max messages + * @return The connection consumer + * @throws JMSException Thrown if an error occurs + */ + @Override + public ConnectionConsumer createConnectionConsumer(final Destination destination, + final String name, + final ServerSessionPool pool, + final int maxMessages) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createConnectionConsumer(" + destination + + ", " + + name + + ", " + + pool + + ", " + + maxMessages + + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Create a session + * + * @param transacted Use transactions + * @param acknowledgeMode The acknowledge mode + * @return The session + * @throws JMSException Thrown if an error occurs + */ + @Override + public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createSession(" + transacted + ", " + acknowledgeMode + ")"); + } + + checkClosed(); + return allocateConnection(transacted, acknowledgeMode, type); + } + + /** + * Create a XA session + * + * @return The XA session + * @throws JMSException Thrown if an error occurs + */ + public XASession createXASession() throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createXASession()"); + } + + checkClosed(); + return allocateConnection(type); + } + + /** + * Get the connection metadata + * + * @return The connection metadata + * @throws JMSException Thrown if an error occurs + */ + @Override + public ConnectionMetaData getMetaData() throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("getMetaData()"); + } + + checkClosed(); + return mcf.getMetaData(); + } + + /** + * Get the exception listener -- throws IllegalStateException + * + * @return The exception listener + * @throws JMSException Thrown if an error occurs + */ + @Override + public ExceptionListener getExceptionListener() throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("getExceptionListener()"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Set the exception listener -- throws IllegalStateException + * + * @param listener The exception listener + * @throws JMSException Thrown if an error occurs + */ + @Override + public void setExceptionListener(final ExceptionListener listener) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("setExceptionListener(" + listener + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Start + * + * @throws JMSException Thrown if an error occurs + */ + @Override + public void start() throws JMSException + { + checkClosed(); + + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("start() " + this); + } + + synchronized (sessions) + { + if (started) + { + return; + } + started = true; + for (ActiveMQRASession session : sessions) + { + session.start(); + } + } + } + + /** + * Stop + * + * @throws IllegalStateException + * @throws JMSException Thrown if an error occurs + */ + @Override + public void stop() throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("stop() " + this); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Close + * + * @throws JMSException Thrown if an error occurs + */ + @Override + public void close() throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("close() " + this); + } + + if (closed) + { + return; + } + + closed = true; + + synchronized (sessions) + { + for (Iterator<ActiveMQRASession> i = sessions.iterator(); i.hasNext(); ) + { + ActiveMQRASession session = i.next(); + try + { + session.closeSession(); + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.trace("Error closing session", t); + } + i.remove(); + } + } + + synchronized (tempQueues) + { + for (Iterator<TemporaryQueue> i = tempQueues.iterator(); i.hasNext(); ) + { + TemporaryQueue temp = i.next(); + try + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("Closing temporary queue " + temp + " for " + this); + } + temp.delete(); + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.trace("Error deleting temporary queue", t); + } + i.remove(); + } + } + + synchronized (tempTopics) + { + for (Iterator<TemporaryTopic> i = tempTopics.iterator(); i.hasNext(); ) + { + TemporaryTopic temp = i.next(); + try + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("Closing temporary topic " + temp + " for " + this); + } + temp.delete(); + } + catch (Throwable t) + { + ActiveMQRALogger.LOGGER.trace("Error deleting temporary queue", t); + } + i.remove(); + } + } + } + + /** + * Close session + * + * @param session The session + * @throws JMSException Thrown if an error occurs + */ + public void closeSession(final ActiveMQRASession session) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("closeSession(" + session + ")"); + } + + synchronized (sessions) + { + sessions.remove(session); + } + } + + /** + * Add temporary queue + * + * @param temp The temporary queue + */ + public void addTemporaryQueue(final TemporaryQueue temp) + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("addTemporaryQueue(" + temp + ")"); + } + + synchronized (tempQueues) + { + tempQueues.add(temp); + } + } + + /** + * Add temporary topic + * + * @param temp The temporary topic + */ + public void addTemporaryTopic(final TemporaryTopic temp) + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("addTemporaryTopic(" + temp + ")"); + } + + synchronized (tempTopics) + { + tempTopics.add(temp); + } + } + + @Override + public Session createSession(int sessionMode) throws JMSException + { + return createSession(sessionMode == Session.SESSION_TRANSACTED, sessionMode); + } + + @Override + public Session createSession() throws JMSException + { + return createSession(Session.AUTO_ACKNOWLEDGE); + } + + @Override + public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createSharedConnectionConsumer(" + topic + ", " + subscriptionName + ", " + + messageSelector + ", " + sessionPool + ", " + maxMessages + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + @Override + public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("createSharedDurableConnectionConsumer(" + topic + ", " + subscriptionName + + ", " + messageSelector + ", " + sessionPool + ", " + maxMessages + ")"); + } + + throw new IllegalStateException(ActiveMQRASessionFactory.ISE); + } + + /** + * Allocation a connection + * + * @param sessionType The session type + * @return The session + * @throws JMSException Thrown if an error occurs + */ + protected ActiveMQRASession allocateConnection(final int sessionType) throws JMSException + { + return allocateConnection(false, Session.AUTO_ACKNOWLEDGE, sessionType); + } + + /** + * Allocate a connection + * + * @param transacted Use transactions + * @param acknowledgeMode The acknowledge mode + * @param sessionType The session type + * @return The session + * @throws JMSException Thrown if an error occurs + */ + protected ActiveMQRASession allocateConnection(boolean transacted, int acknowledgeMode, final int sessionType) throws JMSException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("allocateConnection(" + transacted + + ", " + + acknowledgeMode + + ", " + + sessionType + + ")"); + } + + try + { + synchronized (sessions) + { + if (sessions.isEmpty() == false) + { + throw new IllegalStateException("Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6"); + } + //from createSession + // In a Java EE web or EJB container, when there is an active JTA transaction in progress: + //Both arguments {@code transacted} and {@code acknowledgeMode} are ignored. + if (inJtaTransaction()) + { + transacted = true; + //from getAcknowledgeMode + // If the session is not transacted, returns the + // current acknowledgement mode for the session. + // If the session + // is transacted, returns SESSION_TRANSACTED. + acknowledgeMode = Session.SESSION_TRANSACTED; + } + //In the Java EE web or EJB container, when there is no active JTA transaction in progress + // The argument {@code transacted} is ignored. + else + { + //The session will always be non-transacted, + transacted = false; + switch (acknowledgeMode) + { + //using one of the two acknowledgement modes AUTO_ACKNOWLEDGE and DUPS_OK_ACKNOWLEDGE. + case Session.AUTO_ACKNOWLEDGE: + case Session.DUPS_OK_ACKNOWLEDGE: + //plus our own + case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE: + case ActiveMQJMSConstants.PRE_ACKNOWLEDGE: + break; + //The value {@code Session.CLIENT_ACKNOWLEDGE} may not be used. + case Session.CLIENT_ACKNOWLEDGE: + throw ActiveMQRABundle.BUNDLE.invalidClientAcknowledgeModeRuntime(); + //same with this although the spec doesn't explicitly say + case Session.SESSION_TRANSACTED: + throw ActiveMQRABundle.BUNDLE.invalidSessionTransactedModeRuntime(); + default: + throw ActiveMQRABundle.BUNDLE.invalidAcknowledgeMode(acknowledgeMode); + } + } + + ActiveMQRAConnectionRequestInfo info = new ActiveMQRAConnectionRequestInfo(transacted, + acknowledgeMode, + sessionType); + info.setUserName(userName); + info.setPassword(password); + info.setClientID(clientID); + info.setDefaults(((ActiveMQResourceAdapter) mcf.getResourceAdapter()).getProperties()); + + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("Allocating session for " + this + " with request info=" + info); + } + + ActiveMQRASession session = (ActiveMQRASession) cm.allocateConnection(mcf, info); + + try + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("Allocated " + this + " session=" + session); + } + + session.setActiveMQSessionFactory(this); + + if (started) + { + session.start(); + } + + sessions.add(session); + + return session; + } + catch (Throwable t) + { + try + { + session.close(); + } + catch (Throwable ignored) + { + } + if (t instanceof Exception) + { + throw (Exception) t; + } + else + { + throw new RuntimeException("Unexpected error: ", t); + } + } + } + } + catch (Exception e) + { + Throwable current = e; + while (current != null && !(current instanceof JMSException)) + { + current = current.getCause(); + } + + if (current != null && current instanceof JMSException) + { + throw (JMSException) current; + } + else + { + JMSException je = new JMSException("Could not create a session: " + e.getMessage()); + je.setLinkedException(e); + je.initCause(e); + throw je; + } + } + } + + /** + * Check if we are closed + * + * @throws IllegalStateException Thrown if closed + */ + protected void checkClosed() throws IllegalStateException + { + if (ActiveMQRASessionFactoryImpl.trace) + { + ActiveMQRALogger.LOGGER.trace("checkClosed()" + this); + } + + if (closed) + { + throw new IllegalStateException("The connection is closed"); + } + } + + private boolean inJtaTransaction() + { + boolean inJtaTx = false; + if (tm != null) + { + Transaction tx = null; + try + { + tx = tm.getTransaction(); + } + catch (SystemException e) + { + //assume false + } + inJtaTx = tx != null; + } + return inJtaTx; + } +}