jstrachan 2002/06/14 10:50:13 Modified: messenger build.xml gump.xml messenger/src/java/org/apache/commons/messenger MessengerSupport.java DefaultMessenger.java Added: messenger/src/java/org/apache/commons/messenger/task ConsumerTask.java messenger/src/java/org/apache/commons/messenger MessengerSession.java Log: Refactored the session pooling code to make things much more simple and clear Also only one thread has a temporary queue so that multi-threaded call() functions should not step on each others toes. Finally added a ConsumerTask for consuming JMS messages via Ant. Revision Changes Path 1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/task/ConsumerTask.java Index: ConsumerTask.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. * * $Id: ConsumerTask.java,v 1.4 2002/05/17 15:05:47 jstrachan Exp $ */ package org.apache.commons.messenger.task; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.Iterator; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.TextMessage; import org.apache.commons.messenger.Messenger; import org.apache.commons.messenger.MessengerManager; import org.apache.tools.ant.Task; import org.apache.tools.ant.BuildException; import org.apache.tools.ant.Project; /** * <p><code>ConsumerTask</code> is an Ant task which will * publish all of the given text files as a JMS Text Message * using a given JMS Connection (Messenger) and a Destination * * @author <a href="mailto:[EMAIL PROTECTED]">James Strachan</a> * @version $Revision: 1.4 $ */ public class ConsumerTask extends Task { private Messenger messenger; private String messengerName; private Destination destination; private String subject; private MessengerManager messengerManager; /** the number of messages to receive */ private int count; /** the output directory */ private File dir = new File("."); // Properties //------------------------------------------------------------------------- /** * Sets the output directory */ public void setDir(File dir) { this.dir = dir; } public Messenger getMessenger() throws JMSException { if ( messenger == null ) { messenger = getMessengerManager().getMessenger( getMessengerName() ); } return messenger; } /** Sets the Messenger to be used */ public void setMessenger(Messenger messenger) { this.messenger = messenger; } /** Getter for property messengerName. * @return Value of property messengerName. */ public String getMessengerName() { return messengerName; } /** Setter for property messengerName. * @param messengerName New value of property messengerName. */ public void setMessengerName(String messengerName) { this.messengerName = messengerName; } /** Getter for property destination. * @return Value of property destination. */ public Destination getDestination() throws JMSException { if ( destination == null ) { destination = getMessenger().getDestination( getSubject() ); } return destination; } /** Setter for property destination. * @param destination New value of property destination. */ public void setDestination(Destination destination) { this.destination = destination; } /** Getter for property subject. * @return Value of property subject. */ public String getSubject() { return subject; } /** Setter for property subject. * @param subject New value of property subject. */ public void setSubject(String subject) { this.subject = subject; } /** Getter for property messengerManager. * @return Value of property messengerManager. */ public MessengerManager getMessengerManager() { return messengerManager; } /** Setter for property messengerManager. * @param messengerManager New value of property messengerManager. */ public void setMessengerManager(MessengerManager messengerManager) { this.messengerManager = messengerManager; } /** * Sets the URI of the Messenger.xml configuration document to use * to configure the messengers to use for this task. */ public void setConfiguration(String uri) throws JMSException { setMessengerManager( MessengerManager.load( uri ) ); } /** * @return the number of messages to receive. * A number less than or equal to 0 will receive messages forever */ public int getCount() { return count; } /** * Setter for the number of messages to receive. * A number less than or equal to 0 will receive messages forever */ public void setCount(int count) { this.count = count; } // Task interface //------------------------------------------------------------------------- /** * Performs the copy operation. */ public void execute() throws BuildException { try { Messenger messenger = getMessenger(); if ( messenger == null ) { throw new BuildException("Must specify a valid Messenger", location ); } Destination destination = getDestination(); if ( destination == null ) { throw new BuildException("Must specify a valid JMS Destination", location ); } if ( count > 0 ) { log( "Will wait until I receive: " + count + " messages and will write to directory: " + dir ); for ( int i = 0; i < count; i++ ) { Message message = messenger.receive( destination ); processMessage( message ); } log( "Finished." ); } else { log( "Infinite loop. Will write to directory: " + dir ); while (true) { Message message = messenger.receive( destination ); processMessage( message ); } } } catch (IOException e) { log( "Caught exception: " + e, Project.MSG_ERR ); throw new BuildException(e, location); } catch (JMSException e) { log( "Caught exception: " + e, Project.MSG_ERR ); throw new BuildException(e, location); } finally { try { // close the JMS connection to release any background threads messenger.close(); } catch (Exception e) { // ignore close exceptions } } } /** * Processes a given message */ protected void processMessage(Message message) throws IOException, JMSException { log( "Received message to: " + message ); String text = null; if ( message instanceof TextMessage ) { TextMessage textMessage = (TextMessage) message; text = textMessage.toString(); } else { // #### bit of a hack!!! // ideally we need an XML format for message persistence text = message.toString(); } processMessageText(text); } /** * Writes the given text to a file */ protected void processMessageText(String text) throws IOException { FileWriter writer = new FileWriter( dir ); writer.write ( text ); writer.close(); } } 1.33 +27 -6 jakarta-commons-sandbox/messenger/build.xml Index: build.xml =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/build.xml,v retrieving revision 1.32 retrieving revision 1.33 diff -u -r1.32 -r1.33 --- build.xml 2 Jun 2002 18:19:57 -0000 1.32 +++ build.xml 14 Jun 2002 17:50:13 -0000 1.33 @@ -188,11 +188,18 @@ </target> - <target name="compile" depends="maven:compile, maven:jar-resources"> - <path id="test.classpath"> + <target name="compile" depends="maven:compile"> + <path id="jms.classpath"> <pathelement path="${maven.build.dest}"/> - <path refid="maven.dependency.classpath"/> + <path refid="maven.dependency.classpath"/> + <pathelement location="${lib.repo}/xmlParserAPIs-2.0.0.jar"/> + <pathelement location="${lib.repo}/xercesImpl-2.0.0.jar"/> + <pathelement path="${jms.classes.dir}"/> + <fileset dir="${jms.lib.dir}"> + <include name="**/*.jar"/> + </fileset> </path> + <!-- add the JARs required for a pluggable JMS provider --> </target> <target name="compile.test" depends="compile"/> @@ -313,7 +320,7 @@ <!-- Construct unit test classpath --> - <path id="test.classpath"> + <path id="old.test.classpath"> <pathelement location="${build.home}/classes"/> <pathelement location="${build.home}/tests"/> <pathelement location="${servlet.jar}"/> @@ -340,7 +347,7 @@ </path> <!-- Running sample programs against the configured JMS provider --> - <path id="jms.classpath"> + <path id="old.jms.classpath"> <pathelement location="${build.home}/classes"/> <pathelement location="${build.home}/tests"/> <pathelement location="${servlet.jar}"/> @@ -545,6 +552,20 @@ <jmsSend messengerName="queue" configuration="${messenger.xml}" subject="jms/Queue"> <fileset dir="src/conf" excludes="**/*.txt"/> </jmsSend> + </target> + + + <target name="demo.receive.loop" depends="compile.test" + description="Receives a number of messages from a JMS destination using the jmsReceive task"> + + <taskdef + name="jmsReceive" + classname="org.apache.commons.messenger.task.ConsumerTask"> + <classpath refid="jms.classpath"/> + </taskdef> + + <mkdir dir="target/output"/> + <jmsReceive messengerName="queue" configuration="${messenger.xml}" subject="jms/Queue" dir="target/output"/> </target> 1.2 +6 -6 jakarta-commons-sandbox/messenger/gump.xml Index: gump.xml =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/gump.xml,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- gump.xml 7 Jun 2002 08:30:32 -0000 1.1 +++ gump.xml 14 Jun 2002 17:50:13 -0000 1.2 @@ -23,12 +23,12 @@ <depend project="jakarta-ant"/> <depend project="xml-xerces"/> - <depend project="commons-logging"/> - <depend project="commons-beanutils"/> - <depend project="commons-collections"/> - <depend project="commons-digester"/> - <depend project="servlet"/> - <depend project="jms"/> + <depend project="$context.node.id"/> + <depend project="$context.node.id"/> + <depend project="$context.node.id"/> + <depend project="$context.node.id"/> + <depend project="$context.node.id"/> + <depend project="$context.node.id"/> <work nested="target/classes"/> <home nested="target"/> 1.22 +42 -72 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java Index: MessengerSupport.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSupport.java,v retrieving revision 1.21 retrieving revision 1.22 diff -u -r1.21 -r1.22 --- MessengerSupport.java 17 May 2002 15:05:45 -0000 1.21 +++ MessengerSupport.java 14 Jun 2002 17:50:13 -0000 1.22 @@ -72,27 +72,12 @@ */ private boolean noLocal; + /** Should we cache the requestor object per thread? */ + private boolean cacheRequestors; + /** A Map of ListenerKey objects to MessageConsumer objects */ private Map listeners = new HashMap(); - /** A Map of MessageConsumer objects indexed by Destination or Destination and selector */ - private Map consumers = new HashMap(); - - /** A Map of MessageProducer objects indexed by Destination */ - private Map producers = new HashMap(); - - ///** A Map of Queue or Topic Requestors indexed by Destination */ - //private Map requestors = new HashMap(); - - private ThreadLocal requestorsMap = new ThreadLocal() { - protected Object initialValue() { - return new HashMap(); - } - }; - - /** The inbox which is used for the call() methods */ - private Destination replyToDestination; - public MessengerSupport() { } @@ -165,12 +150,16 @@ try { if (isTopic(session)) { TopicRequestor requestor = - getTopicRequestor((TopicSession) session, (Topic) destination); + getMessengerSession().getTopicRequestor( + (TopicSession) session, (Topic) destination + ); return requestor.request(message); } else { QueueRequestor requestor = - getQueueRequestor((QueueSession) session, (Queue) destination); + getMessengerSession().getQueueRequestor( + (QueueSession) session, (Queue) destination + ); return requestor.request(message); } } @@ -178,7 +167,10 @@ returnSession(session); } } + /* + * It'd be nice to try replacing the above with this... + * public Message call( Destination destination, Message message ) throws JMSException { Session session = borrowSession(); try { @@ -777,7 +769,18 @@ this.durable = durable; } - /** Returns the durable name used for durable topic based subscriptions */ + /** Gets whether we should cache the requestor object per thread? */ + public boolean isCacheRequestors() { + return cacheRequestors; + } + + /** Sets whether we should cache the requestor object per thread? */ + public void setCacheRequestors(boolean cacheRequestors) { + this.cacheRequestors = cacheRequestors; + } + + + /** @return the durable name used for durable topic based subscriptions */ public String getDurableName() { return durableName; } @@ -810,16 +813,16 @@ /** Borrows a session instance from the pool */ protected abstract Session borrowSession() throws JMSException; - /** Returns a session instance back to the pool */ + /** @return a session instance back to the pool */ protected abstract void returnSession(Session session) throws JMSException; /** Deletes a session instance */ - protected abstract void deleteSession(Session session) throws JMSException; + //protected abstract void deleteSession(Session session) throws JMSException; /** Borrows a session instance from the pool */ protected abstract Session borrowListenerSession() throws JMSException; - /** Returns a session instance back to the pool */ + /** @return a session instance back to the pool */ protected abstract void returnListenerSession(Session session) throws JMSException; @@ -833,7 +836,13 @@ protected abstract boolean isTopic(MessageProducer producer) throws JMSException; - /** Returns a message producer for the given session and destination */ + /** + * @return the current thread's MessengerSession + */ + protected abstract MessengerSession getMessengerSession() throws JMSException; + + + /** @return a message producer for the given session and destination */ protected MessageProducer getMessageProducer( Session session, Destination destination) @@ -848,7 +857,7 @@ */ } - /** Returns a newly created message producer for the given session and destination */ + /** @return a newly created message producer for the given session and destination */ protected MessageProducer createMessageProducer( Session session, Destination destination) @@ -863,7 +872,7 @@ } } - /** Returns a MessageConsumer for the given session and destination */ + /** @return a MessageConsumer for the given session and destination */ protected MessageConsumer getMessageConsumer( Session session, Destination destination) @@ -878,7 +887,7 @@ */ } - /** Returns a MessageConsumer for the given session, destination and selector */ + /** @return a MessageConsumer for the given session, destination and selector */ protected MessageConsumer getMessageConsumer( Session session, Destination destination, @@ -888,7 +897,7 @@ return createMessageConsumer(session, destination, selector); } - /** Returns a new MessageConsumer for the given session and destination */ + /** @return a new MessageConsumer for the given session and destination */ protected MessageConsumer createMessageConsumer( Session session, Destination destination) @@ -910,7 +919,7 @@ } } - /** Returns a new MessageConsumer for the given session, destination and selector */ + /** @return a new MessageConsumer for the given session, destination and selector */ protected MessageConsumer createMessageConsumer( Session session, Destination destination, @@ -951,45 +960,6 @@ } protected Destination getReplyToDestination() throws JMSException { - if (replyToDestination == null) { - replyToDestination = createTemporaryDestination(); - } - return replyToDestination; - } - - protected TopicRequestor getTopicRequestor( - TopicSession session, - Topic destination) - throws JMSException { - if (CACHE_REQUESTOR) { - Map requestors = (Map) requestorsMap.get(); - TopicRequestor requestor = (TopicRequestor) requestors.get(destination); - if (requestor == null) { - requestor = new TopicRequestor(session, destination); - requestors.put(destination, requestor); - } - return requestor; - } - else { - return new TopicRequestor(session, destination); - } - } - - protected QueueRequestor getQueueRequestor( - QueueSession session, - Queue destination) - throws JMSException { - if (CACHE_REQUESTOR) { - Map requestors = (Map) requestorsMap.get(); - QueueRequestor requestor = (QueueRequestor) requestors.get(destination); - if (requestor == null) { - requestor = new QueueRequestor(session, destination); - requestors.put(destination, requestor); - } - return requestor; - } - else { - return new QueueRequestor(session, destination); - } + return getMessengerSession().getReplyToDestination(); } } 1.11 +30 -52 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java Index: DefaultMessenger.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/DefaultMessenger.java,v retrieving revision 1.10 retrieving revision 1.11 diff -u -r1.10 -r1.11 --- DefaultMessenger.java 17 May 2002 15:05:45 -0000 1.10 +++ DefaultMessenger.java 14 Jun 2002 17:50:13 -0000 1.11 @@ -45,19 +45,12 @@ /** Logger */ private static final Log log = LogFactory.getLog(DefaultMessenger.class); - /** the session object for each thread */ - private ThreadLocal sessionPool = new ThreadLocal(); + /** the MessengerSession for each thread */ + private ThreadLocal messengerSessionPool = new ThreadLocal(); - /** the listener session object for each thread */ - private ThreadLocal listenerSessionPool = new ThreadLocal(); - - /** The factory used to create each thread's JMS Session */ + /** the SessionFactory used to create new JMS sessions */ private SessionFactory sessionFactory; - /** A pool of Connections, one per thread */ - private ThreadLocal connectionPool = new ThreadLocal(); - - public DefaultMessenger() { } @@ -75,21 +68,7 @@ } public Connection getConnection() throws JMSException { - if (SHARE_CONNECTION) { - return getSessionFactory().getConnection(); - } - else { - Connection answer = (Connection) connectionPool.get(); - if (answer == null) { - answer = getSessionFactory().createConnection(); - if (log.isInfoEnabled()) { - log.info( - "Created connection: " + answer + " for thread: " + Thread.currentThread()); - } - connectionPool.set(answer); - } - return answer; - } + return getSessionFactory().getConnection(); } public ServerSessionPool createServerSessionPool( @@ -102,59 +81,58 @@ public void close() throws JMSException { getSessionFactory().close(); // clear all the pools... - sessionPool = new ThreadLocal(); - listenerSessionPool = new ThreadLocal(); + messengerSessionPool = new ThreadLocal(); } // Implementation methods //------------------------------------------------------------------------- protected boolean isTopic(Connection connection) throws JMSException { - return sessionFactory.isTopic(); + return getSessionFactory().isTopic(); } protected boolean isTopic(ConnectionFactory factory) throws JMSException { - return sessionFactory.isTopic(); + return getSessionFactory().isTopic(); } protected boolean isTopic(Session session) throws JMSException { - return sessionFactory.isTopic(); + return getSessionFactory().isTopic(); } protected boolean isTopic(MessageProducer producer) throws JMSException { - return sessionFactory.isTopic(); + return getSessionFactory().isTopic(); } protected Session borrowSession() throws JMSException { - Session answer = (Session) sessionPool.get(); - if (answer == null) { - answer = createSession(); - sessionPool.set(answer); - } - return answer; + return getMessengerSession().getSession(); } protected void returnSession(Session session) { } - protected void deleteSession(Session session) throws JMSException { - sessionPool.set(null); + protected Session borrowListenerSession() throws JMSException { + return getMessengerSession().getListenerSession(); } - protected Session borrowListenerSession() throws JMSException { - Session answer = (Session) listenerSessionPool.get(); + protected void returnListenerSession(Session session) throws JMSException { + } + + /** + * @return the current thread's MessengerSession + */ + protected MessengerSession getMessengerSession() throws JMSException { + MessengerSession answer = (MessengerSession) messengerSessionPool.get(); if (answer == null) { - answer = createSession(); - listenerSessionPool.set(answer); + answer = createMessengerSession(); + messengerSessionPool.set(answer); } return answer; } - - protected void returnListenerSession(Session session) throws JMSException { - } - - /** Factory method to create a new JMS Session */ - protected Session createSession() throws JMSException { - return getSessionFactory().createSession(getConnection()); + + /** + * Factory method to create a new MessengerSession + */ + protected MessengerSession createMessengerSession() throws JMSException { + return new MessengerSession( this, getSessionFactory() ); } /** Factory method to create a SessionFactory. 1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messenger/MessengerSession.java Index: MessengerSession.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. * * $Id: MessengerDigester.java,v 1.4 2001/11/13 12:46:10 jstrachan Exp $ */ package org.apache.commons.messenger; import java.util.HashMap; import java.util.Map; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueRequestor; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicPublisher; import javax.jms.TopicRequestor; import javax.jms.TopicSession; /** <p><code>MessengerSession</code> represents all the local information for a single thread.</p> * * @author <a href="mailto:[EMAIL PROTECTED]">James Strachan</a> * @version $Revision: 1.4 $ */ public class MessengerSession { /** the JMS Session for this thread */ private Session session; /** the JMS Listener (async subscription) Session for this thread */ private Session listenerSession; /** The factory used to create each thread's JMS Session */ private SessionFactory sessionFactory; /** An optional cache of requestors */ private Map requestorsMap; /** The inbox which is used for the call() methods */ private Destination replyToDestination; /** The current messenger to which I'm connected */ private MessengerSupport messenger; public MessengerSession(MessengerSupport messenger, SessionFactory sessionFactory) { this.messenger = messenger; this.sessionFactory = sessionFactory; } public SessionFactory getSessionFactory() { return sessionFactory; } /** * @return the JMS Session for this thread for synchronous mode */ public Session getSession() throws JMSException { if ( session == null ) { session = createSession(); } return session; } /** * @return the JMS Session for this thread for asynchronous mode */ public Session getListenerSession() throws JMSException { if ( listenerSession == null ) { listenerSession = createSession(); } return listenerSession; } /** * @return the reply to destination (a temporary queue) * used to reply to this thread and session */ protected Destination getReplyToDestination() throws JMSException { if (replyToDestination == null) { replyToDestination = createTemporaryDestination(); } return replyToDestination; } /** * @return either a cached TopicRequestor or creates a new one */ public TopicRequestor getTopicRequestor( TopicSession session, Topic destination) throws JMSException { if (messenger.isCacheRequestors()) { TopicRequestor requestor = (TopicRequestor) getRequestorsMap().get(destination); if (requestor == null) { requestor = new TopicRequestor(session, destination); getRequestorsMap().put(destination, requestor); } return requestor; } else { return new TopicRequestor(session, destination); } } /** * @return either a cached QueueRequestor or creates a new one */ public QueueRequestor getQueueRequestor( QueueSession session, Queue destination) throws JMSException { if (messenger.isCacheRequestors()) { QueueRequestor requestor = (QueueRequestor) getRequestorsMap().get(destination); if (requestor == null) { requestor = new QueueRequestor(session, destination); getRequestorsMap().put(destination, requestor); } return requestor; } else { return new QueueRequestor(session, destination); } } /** * Factory method to create a new JMS Session */ protected Session createSession() throws JMSException { return getSessionFactory().createSession(messenger.getConnection()); } /** * Factory method to create a new temporary destination */ protected Destination createTemporaryDestination() throws JMSException { if (messenger.isTopic(session)) { TopicSession topicSession = (TopicSession) session; return topicSession.createTemporaryTopic(); } else { QueueSession queueSession = (QueueSession) session; return queueSession.createTemporaryQueue(); } } /** * @return the map of requestors, indexed by destination. * The Map will be lazily constructed */ protected Map getRequestorsMap() { if ( requestorsMap == null ) { requestorsMap = new HashMap(); } return requestorsMap; } }
-- To unsubscribe, e-mail: <mailto:[EMAIL PROTECTED]> For additional commands, e-mail: <mailto:[EMAIL PROTECTED]>