jstrachan 01/08/30 05:18:06 Modified: messenger/src/java/org/apache/commons/messenger Messenger.java MessengerSupport.java messenger/src/test/org/apache/commons/messenger TestMessenger.java Log: Modified Messenger API to use Destination rather than a String subject to allow cleaner JMS integration, particularly when sending a reply to a JMS message Revision Changes Path 1.4 +27 -23 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java Index: Messenger.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/Messenger.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- Messenger.java 2001/08/28 22:38:28 1.3 +++ Messenger.java 2001/08/30 12:18:06 1.4 @@ -5,13 +5,14 @@ * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. * - * $Id: Messenger.java,v 1.3 2001/08/28 22:38:28 jstrachan Exp $ + * $Id: Messenger.java,v 1.4 2001/08/30 12:18:06 jstrachan Exp $ */ package org.apache.commons.messenger; import java.io.Serializable; import javax.jms.BytesMessage; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; @@ -28,47 +29,50 @@ * taglib) will use the same JMS Session.</p> * * @author <a href="mailto:[EMAIL PROTECTED]">James Strachan</a> - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public interface Messenger { /** Temporary hack - this method has been added so that Messenger works better with the digester */ public String getName(); + + /** Returns the destination for the given subject name */ + public Destination getDestination(String subject) throws JMSException; - /** Sends a message on the given subject name */ - public void send(String subject, Message message) throws JMSException; + /** Sends a message on the given destination */ + public void send(Destination destination, Message message) throws JMSException; - /** Sends a message on the given subject and blocks until a response is returned */ - public Message call(String subject, Message message) throws JMSException; + /** Sends a message on the given destination and blocks until a response is returned */ + public Message call(Destination destination, Message message) throws JMSException; - /** Receives a message on the given subject name, blocking until one is returned */ - public Message receive(String subject) throws JMSException; + /** Receives a message on the given destination, blocking until one is returned */ + public Message receive(Destination destination) throws JMSException; - /** Receives a message on the given subject name and message selector, blocking until one is returned */ - public Message receive(String subject, String selector) throws JMSException; + /** Receives a message on the given destination and message selector, blocking until one is returned */ + public Message receive(Destination destination, String selector) throws JMSException; - /** Receives a message on the given subject name, blocking for the specified timeout */ - public Message receive(String subject, long timeoutMillis) throws JMSException; + /** Receives a message on the given destination, blocking for the specified timeout */ + public Message receive(Destination destination, long timeoutMillis) throws JMSException; - /** Receives a message on the given subject name and selector, blocking for the specified timeout */ - public Message receive(String subject, String selector, long timeoutMillis) throws JMSException; + /** Receives a message on the given destination and selector, blocking for the specified timeout */ + public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException; - /** Receives a message on the given subject name without blocking or returns null */ - public Message receiveNoWait(String subject) throws JMSException; + /** Receives a message on the given destination without blocking or returns null */ + public Message receiveNoWait(Destination destination) throws JMSException; - /** Receives a message on the given subject name and selector without blocking or returns null */ - public Message receiveNoWait(String subject, String selector) throws JMSException; + /** Receives a message on the given destination and selector without blocking or returns null */ + public Message receiveNoWait(Destination destination, String selector) throws JMSException; // Listener API //------------------------------------------------------------------------- - /** Adds a message listener on the given subject */ - public void addListener(String subject, MessageListener listener) throws JMSException; - public void addListener(String subject, String selector, MessageListener listener) throws JMSException; + /** Adds a message listener on the given destination */ + public void addListener(Destination destination, MessageListener listener) throws JMSException; + public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException; - public void removeListener(String subject, MessageListener listener) throws JMSException; - public void removeListener(String subject, String selector, MessageListener listener) throws JMSException; + public void removeListener(Destination destination, MessageListener listener) throws JMSException; + public void removeListener(Destination destination, String selector, MessageListener listener) throws JMSException; // Message factory methods 1.4 +61 -53 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java Index: MessengerSupport.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- MessengerSupport.java 2001/08/28 22:38:28 1.3 +++ MessengerSupport.java 2001/08/30 12:18:06 1.4 @@ -5,13 +5,14 @@ * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. * - * $Id: MessengerSupport.java,v 1.3 2001/08/28 22:38:28 jstrachan Exp $ + * $Id: MessengerSupport.java,v 1.4 2001/08/30 12:18:06 jstrachan Exp $ */ package org.apache.commons.messenger; import java.io.Serializable; import javax.jms.BytesMessage; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; @@ -37,7 +38,7 @@ * connection and session creation and the pooling strategy.</p> * * @author <a href="mailto:[EMAIL PROTECTED]">James Strachan</a> - * @version $Revision: 1.3 $ + * @version $Revision: 1.4 $ */ public abstract class MessengerSupport implements Messenger { @@ -58,10 +59,25 @@ public MessengerSupport() { } - public void send( String subject, Message message ) throws JMSException { + public Destination getDestination(String subject) throws JMSException { Session session = borrowSession(); try { - MessageProducer producer = getMessageProducer( session, subject ); + if ( session instanceof TopicSession ) { + return getTopic( (TopicSession) session, subject ); + } + else { + return getQueue( (QueueSession) session, subject ); + } + } + finally { + returnSession( session ); + } + } + + public void send( Destination destination, Message message ) throws JMSException { + Session session = borrowSession(); + try { + MessageProducer producer = getMessageProducer( session, destination ); if ( producer instanceof TopicPublisher ) { ((TopicPublisher) producer).publish( message ); } @@ -74,14 +90,14 @@ } } - public Message call( String subject, Message message ) throws JMSException { + public Message call( Destination destination, Message message ) throws JMSException { Session session = borrowSession(); try { if ( session instanceof TopicSession ) { TopicSession topicSession = (TopicSession) session; TopicRequestor requestor = new TopicRequestor( topicSession, - getTopic( topicSession, subject ) + (Topic) destination ); return requestor.request( message ); } @@ -89,7 +105,7 @@ QueueSession queueSession = (QueueSession) session; QueueRequestor requestor = new QueueRequestor( queueSession, - getQueue( queueSession, subject ) + (Queue) destination ); return requestor.request( message ); } @@ -99,10 +115,10 @@ } } - public Message receive(String subject) throws JMSException { + public Message receive(Destination destination) throws JMSException { Session session = borrowSession(); try { - MessageConsumer consumer = getMessageConsumer( session, subject ); + MessageConsumer consumer = getMessageConsumer( session, destination ); return consumer.receive(); } finally { @@ -110,10 +126,10 @@ } } - public Message receive(String subject, String selector) throws JMSException { + public Message receive(Destination destination, String selector) throws JMSException { Session session = borrowSession(); try { - MessageConsumer consumer = getMessageConsumer( session, subject, selector ); + MessageConsumer consumer = getMessageConsumer( session, destination, selector ); return consumer.receive(); } finally { @@ -121,10 +137,10 @@ } } - public Message receive(String subject, long timeoutMillis) throws JMSException { + public Message receive(Destination destination, long timeoutMillis) throws JMSException { Session session = borrowSession(); try { - MessageConsumer consumer = getMessageConsumer( session, subject ); + MessageConsumer consumer = getMessageConsumer( session, destination ); return consumer.receive(timeoutMillis); } finally { @@ -132,10 +148,10 @@ } } - public Message receive(String subject, String selector, long timeoutMillis) throws JMSException { + public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException { Session session = borrowSession(); try { - MessageConsumer consumer = getMessageConsumer( session, subject, selector ); + MessageConsumer consumer = getMessageConsumer( session, destination, selector ); return consumer.receive(timeoutMillis); } finally { @@ -143,10 +159,10 @@ } } - public Message receiveNoWait(String subject) throws JMSException { + public Message receiveNoWait(Destination destination) throws JMSException { Session session = borrowSession(); try { - MessageConsumer consumer = getMessageConsumer( session, subject ); + MessageConsumer consumer = getMessageConsumer( session, destination ); return consumer.receiveNoWait(); } finally { @@ -154,10 +170,10 @@ } } - public Message receiveNoWait(String subject, String selector) throws JMSException { + public Message receiveNoWait(Destination destination, String selector) throws JMSException { Session session = borrowSession(); try { - MessageConsumer consumer = getMessageConsumer( session, subject, selector ); + MessageConsumer consumer = getMessageConsumer( session, destination, selector ); return consumer.receiveNoWait(); } finally { @@ -169,10 +185,10 @@ // Listener API //------------------------------------------------------------------------- - public void addListener(String subject, MessageListener listener) throws JMSException { + public void addListener(Destination destination, MessageListener listener) throws JMSException { Session session = borrowSession(); try { - MessageConsumer consumer = createMessageConsumer( session, subject ); + MessageConsumer consumer = createMessageConsumer( session, destination ); consumer.setMessageListener( listener ); } finally { @@ -180,10 +196,10 @@ } } - public void addListener(String subject, String selector, MessageListener listener) throws JMSException { + public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException { Session session = borrowSession(); try { - MessageConsumer consumer = createMessageConsumer( session, subject, selector ); + MessageConsumer consumer = createMessageConsumer( session, destination, selector ); consumer.setMessageListener( listener ); } finally { @@ -192,12 +208,12 @@ } - public void removeListener(String subject, MessageListener listener ) throws JMSException { + public void removeListener(Destination destination, MessageListener listener ) throws JMSException { // we need to iterate through all sessions to find which one has a listener for throw new JMSException( "Not implemented yet" ); } - public void removeListener(String subject, String selector, MessageListener listener ) throws JMSException { + public void removeListener(Destination destination, String selector, MessageListener listener ) throws JMSException { // we need to iterate through all sessions to find which one has a listener for throw new JMSException( "Not implemented yet" ); } @@ -373,59 +389,59 @@ /** Deletes a session instance */ protected abstract void deleteSession(Session session) throws JMSException; - /** Returns a message producer for the given session and subject */ - protected MessageProducer getMessageProducer( Session session, String subject ) throws JMSException { + /** Returns a message producer for the given session and destination */ + protected MessageProducer getMessageProducer( Session session, Destination destination ) throws JMSException { if ( session instanceof TopicSession ) { TopicSession topicSession = (TopicSession) session; - return topicSession.createPublisher( getTopic(topicSession, subject) ); + return topicSession.createPublisher( (Topic) destination ); } else { QueueSession queueSession = (QueueSession) session; - return queueSession.createSender( getQueue(queueSession, subject) ); + return queueSession.createSender( (Queue) destination ); } } - /** Returns a MessageConsumer for the given session and subject */ - protected MessageConsumer getMessageConsumer( Session session, String subject ) throws JMSException { + /** Returns a MessageConsumer for the given session and destination */ + protected MessageConsumer getMessageConsumer( Session session, Destination destination ) throws JMSException { // could do caching one day - return createMessageConsumer( session, subject ); + return createMessageConsumer( session, destination ); } - /** Returns a MessageConsumer for the given session, subject and selector */ - protected MessageConsumer getMessageConsumer( Session session, String subject, String selector ) throws JMSException { + /** Returns a MessageConsumer for the given session, destination and selector */ + protected MessageConsumer getMessageConsumer( Session session, Destination destination, String selector ) throws JMSException { // could do caching one day - return createMessageConsumer( session, subject, selector ); + return createMessageConsumer( session, destination, selector ); } - /** Returns a new MessageConsumer for the given session and subject */ - protected MessageConsumer createMessageConsumer( Session session, String subject ) throws JMSException { + /** Returns a new MessageConsumer for the given session and destination */ + protected MessageConsumer createMessageConsumer( Session session, Destination destination ) throws JMSException { if ( session instanceof TopicSession ) { TopicSession topicSession = (TopicSession) session; if ( isDurable() ) { return topicSession.createDurableSubscriber( - getTopic(topicSession, subject), + (Topic) destination, getDurableName() ); } else { return topicSession.createSubscriber( - getTopic(topicSession, subject) + (Topic) destination ); } } else { QueueSession queueSession = (QueueSession) session; - return queueSession.createReceiver( getQueue(queueSession, subject) ); + return queueSession.createReceiver( (Queue) destination ); } } - /** Returns a new MessageConsumer for the given session, subject and selector */ - protected MessageConsumer createMessageConsumer( Session session, String subject, String selector ) throws JMSException { + /** Returns a new MessageConsumer for the given session, destination and selector */ + protected MessageConsumer createMessageConsumer( Session session, Destination destination, String selector ) throws JMSException { if ( session instanceof TopicSession ) { TopicSession topicSession = (TopicSession) session; if ( isDurable() ) { return topicSession.createDurableSubscriber( - getTopic(topicSession, subject), + (Topic) destination, getDurableName(), selector, isNoLocal() @@ -433,7 +449,7 @@ } else { return topicSession.createSubscriber( - getTopic(topicSession, subject), + (Topic) destination, selector, isNoLocal() ); @@ -442,23 +458,15 @@ else { QueueSession queueSession = (QueueSession) session; return queueSession.createReceiver( - getQueue(queueSession, subject), + (Queue) destination, selector ); } } - protected Queue getQueue(Session session, String subject) throws JMSException { - return getQueue((QueueSession) session, subject); - } - protected Queue getQueue(QueueSession session, String subject) throws JMSException { // XXXX: might want to cache return session.createQueue( subject ); - } - - protected Topic getTopic(Session session, String subject) throws JMSException { - return getTopic((TopicSession) session, subject); } protected Topic getTopic(TopicSession session, String subject) throws JMSException { 1.3 +15 -10 jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java Index: TestMessenger.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/test/org/apache/commons/messenger/TestMessenger.java,v retrieving revision 1.2 retrieving revision 1.3 diff -u -r1.2 -r1.3 --- TestMessenger.java 2001/08/28 22:38:29 1.2 +++ TestMessenger.java 2001/08/30 12:18:06 1.3 @@ -5,7 +5,7 @@ * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. * - * $Id: TestMessenger.java,v 1.2 2001/08/28 22:38:29 jstrachan Exp $ + * $Id: TestMessenger.java,v 1.3 2001/08/30 12:18:06 jstrachan Exp $ */ package org.apache.commons.messenger; @@ -13,6 +13,7 @@ import java.util.ArrayList; import java.util.List; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.TextMessage; @@ -23,15 +24,15 @@ /** Test harness for Messenger * * @author <a href="mailto:[EMAIL PROTECTED]">James Strachan</a> - * @version $Revision: 1.2 $ + * @version $Revision: 1.3 $ */ public class TestMessenger extends TestCase { protected static boolean verbose = true; protected List failures = new ArrayList(); - protected String topic = "jms/Topic"; - protected String queue = "jms/Queue"; + protected String topicName = "jms/Topic"; + protected String queueName = "jms/Queue"; protected String topicMessageText = "This is the text of a topic message"; protected String queueMessageText = "This is the text of a queue message"; @@ -61,8 +62,9 @@ public void testSendTopic() throws Exception { Messenger messenger = MessengerManager.get( "topic" ); + Destination topic = messenger.getDestination( topicName ); - clearSubject( messenger, topic ); + flushDestination( messenger, topic ); Thread thread = new Thread() { public void run() { @@ -95,8 +97,9 @@ public void testSendQueue() throws Exception { Messenger messenger = MessengerManager.get( "queue" ); + Destination queue = messenger.getDestination( queueName ); - clearSubject( messenger, queue ); + flushDestination( messenger, queue ); Thread thread = new Thread() { public void run() { @@ -130,12 +133,12 @@ protected void setUp() throws Exception { } - protected void clearSubject(Messenger messenger, String subject) throws Exception { - log( "Clearing messenger subject: " + subject ); + protected void flushDestination(Messenger messenger, Destination destination) throws Exception { + log( "Clearing messenger destination: " + destination ); // lets remove any existing messages while (true) { - Message m = messenger.receiveNoWait( topic ); + Message m = messenger.receiveNoWait( destination ); if ( m != null ) { log( "Ignoring message: " + m ); } @@ -144,11 +147,12 @@ } } - log( "Cleared messenger subject: " + subject ); + log( "Cleared messenger destination: " + destination ); } protected void receiveTopicMessage() throws Exception { Messenger messenger = MessengerManager.get( "topic" ); + Destination topic = messenger.getDestination( topicName ); log( "Calling receive() on topic" ); @@ -162,6 +166,7 @@ protected void receiveQueueMessage() throws Exception { Messenger messenger = MessengerManager.get( "queue" ); + Destination queue = messenger.getDestination( queueName ); log( "Calling receive() on queue" );