jstrachan 2002/10/21 13:31:28 Modified: messenger/src/java/org/apache/commons/messagelet Main.java ManagerServlet.java messenger/src/java/org/apache/commons/messagelet/model SubscriptionDigester.java Subscription.java messenger maven.xml messenger/src/conf subscribe.xml Added: messenger/src/java/org/apache/commons/messagelet SubscriptionManager.java ConsumerThread.java Log: Refactored the subscription mechanism into a reusable SubscriptionManager. This allows code to be shared between the stand alone messagelet engine and the ManagerServlet. Also added an option for a <consumerThread> which allows for a plugin point for performing transactional message consumption Revision Changes Path 1.6 +5 -73 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/Main.java Index: Main.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/Main.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- Main.java 7 Oct 2002 10:38:23 -0000 1.5 +++ Main.java 21 Oct 2002 20:31:27 -0000 1.6 @@ -92,11 +92,12 @@ public void run() throws Exception { // force lazy construction - getMessengerManager(); + SubscriptionManager subscriber = new SubscriptionManager(); + subscriber.setMessengerManager( getMessengerManager() ); + subscriber.setSubscriptionList( createSubscriptionList() ); + subscriber.setServletContext( getServletContext() ); - // load the subscriptions.... - subscriptionList = createSubscriptionList(); - subscribe( subscriptionList ); + subscriber.subscribe(); // now lets start all the connections... for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); ) { @@ -152,65 +153,6 @@ // Implementation methods //------------------------------------------------------------------------- - protected void subscribe( SubscriptionList list ) throws JMSException, ServletException { - for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) { - Subscription subscription = (Subscription) iter.next(); - subscribe( subscription ); - } - } - - protected void subscribe( Subscription subscription ) throws JMSException, ServletException { - String name = subscription.getConnection(); - Messenger messenger = getMessenger( name ); - if ( messenger == null ) { - throw new JMSException( "No such Messenger called: " + name + " for subscription: " + subscription ); - } - MessageListener listener = subscription.getMessageListener(); - if ( listener == null ) { - throw new JMSException( "No MessageListener is defined for subscription: " + subscription ); - } - - // if its an MDO the initialise it! - if ( listener instanceof MessageDrivenObject ) { - MessageDrivenObject mdo = (MessageDrivenObject) listener; - if ( mdo instanceof MessengerMDO ) { - MessengerMDO messengerMDO = (MessengerMDO) mdo; - messengerMDO.setMessenger( messenger ); - messengerMDO.setMessengerManager( getMessengerManager() ); - } - mdo.init( getServletContext() ); - } - - listener = wrapInStopWatch( listener ); - - String subject = subscription.getSubject(); - if ( subject == null || subject.length() == 0 ) { - throw new JMSException( "No destination defined for subscription: " + subscription ); - } - - Destination destination = messenger.getDestination( subject ); - if ( destination == null ) { - throw new JMSException( "No destination could be found for name: " + subject + " for subscription: " + subscription ); - } - - // #### at this point we may wish to create a thread pool of multiple threads - // #### each consuming from the same Destination in parallel - - String selector = subscription.getSelector(); - if ( selector != null && selector.length() > 0 ) { - log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector ); - - messenger.addListener( destination, selector, listener ); - } - else { - log.info( "Subscribing to messenger: " + name + " destination: " + subject ); - - messenger.addListener( destination, listener ); - } - - log.info( "Subscribed with listener: " + listener ); - } - protected MessengerManager createMessengerManager() throws JMSException { String config = connectionsConfig; @@ -252,16 +194,6 @@ return null; } - /** - * Allows the MessageListener to be wrapped inside a stop watch message listener if required - */ - protected MessageListener wrapInStopWatch( MessageListener listener ) { - if ( useStopWatch ) { - return new StopWatchMessageListener( listener ); - } - return listener; - } - /** * This method blocks the current thread indefinitely until the JVM is terminated. */ 1.14 +51 -200 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ManagerServlet.java Index: ManagerServlet.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ManagerServlet.java,v retrieving revision 1.13 retrieving revision 1.14 diff -u -r1.13 -r1.14 --- ManagerServlet.java 13 Aug 2002 07:19:10 -0000 1.13 +++ ManagerServlet.java 21 Oct 2002 20:31:27 -0000 1.14 @@ -40,14 +40,12 @@ */ public class ManagerServlet extends GenericServlet { - private static final String KEY_MESSENGER_MANAGER = MessengerManager.class.getName(); - private static final String KEY_SUBSCRIPTIONLIST = SubscriptionList.class.getName(); - private static final String KEY_CONNECTIONS = "connections"; - private static final String KEY_SUBSCRIPTIONS = "subscriptions"; - /** Should HTTP servlets be used or generic servlets. If true then JSP can be dispatched to easily */ private static final boolean USE_HTTP_SERVLETS = true; + private static final String KEY_CONNECTIONS = "connections"; + private static final String KEY_SUBSCRIPTIONS = "subscriptions"; + /** * Whether exceptions occurring during subscriptions on startup should * terminate the initialization @@ -57,40 +55,15 @@ public ManagerServlet() { } - public MessengerManager getMessengerManager() { - return (MessengerManager) getServletContext().getAttribute( KEY_MESSENGER_MANAGER ); - } - - public void setMessengerManager(MessengerManager messengerManager) { - if ( messengerManager == null ) { - getServletContext().removeAttribute( KEY_MESSENGER_MANAGER ); - } - else { - getServletContext().setAttribute( KEY_MESSENGER_MANAGER, messengerManager ); + public SubscriptionManager getSubscriptionManager() { + SubscriptionManager answer = (SubscriptionManager) getServletContext().getAttribute( "subscriptionManager" ); + if (answer == null) { + answer = new SubscriptionManager(); + getServletContext().setAttribute( "subscriptionManager", answer ); } + return answer; } - - public SubscriptionList getSubscriptionList() { - return (SubscriptionList) getServletContext().getAttribute( KEY_SUBSCRIPTIONLIST ); - } - - public void setSubscriptionList(SubscriptionList subscriptionList) { - if ( subscriptionList == null ) { - getServletContext().removeAttribute( KEY_SUBSCRIPTIONLIST ); - } - else { - getServletContext().setAttribute( KEY_SUBSCRIPTIONLIST, subscriptionList ); - } - } - - public Messenger getMessenger(String name) throws ServletException { - MessengerManager messengerManager = getMessengerManager(); - if ( messengerManager == null ) { - throw new ServletException( "No MessengerManager has been initialized yet" ); - } - return messengerManager.getMessenger( name ); - } - + // Servlet methods //------------------------------------------------------------------------- @@ -101,44 +74,56 @@ } // ensure Messenger is initialised - MessengerManager manager = getMessengerManager(); - if ( manager == null ) { - manager = createMessengerManager(); - setMessengerManager( manager ); - - // load the subscriptions.... - SubscriptionList list = createSubscriptionList(); - subscribe( list ); - setSubscriptionList( list ); - - // now lets start all the connections... - for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); ) { - String name = (String) iter.next(); - Messenger messenger = manager.getMessenger( name ); - try { - messenger.getConnection().start(); - } - catch (JMSException e) { - log( "Caught exception trying to start messenger: " + name + ". Exception: " + e, e ); + try { + SubscriptionManager subscriber = getSubscriptionManager(); + MessengerManager manager = subscriber.getMessengerManager(); + if ( manager == null ) { + manager = createMessengerManager(); + + subscriber.setMessengerManager( manager ); + subscriber.setSubscriptionList( createSubscriptionList() ); + subscriber.setServletContext( getServletContext() ); + + // load the subscriptions.... + subscriber.subscribe(); + + // now lets start all the connections... + for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); ) { + String name = (String) iter.next(); + Messenger messenger = manager.getMessenger( name ); + try { + messenger.getConnection().start(); + } + catch (JMSException e) { + log( "Caught exception trying to start messenger: " + name + ". Exception: " + e, e ); + } } } } + catch (JMSException e) { + throw new ServletException("Failed to initialize: " + e, e ); + } } public void destroy() { try { - destroyMBOs(); + getSubscriptionManager().unsubscribe(); } - catch (ServletException e) { + catch (Exception e) { log( "Failed to destrory the MBOs: " + e, e ); } - - MessengerManager manager = getMessengerManager(); - if ( manager != null ) { - log( "Closing the Messenger connections" ); - manager.close(); + + try { + MessengerManager manager = getSubscriptionManager().getMessengerManager(); + if ( manager != null ) { + log( "Closing the Messenger connections" ); + manager.close(); + } } - setMessengerManager( null ); + catch (Exception e) { + log( "Failed to close the Messenger Manager: " + e, e ); + } + getSubscriptionManager().setMessengerManager( null ); } public void service(ServletRequest request, ServletResponse response) throws ServletException { @@ -162,140 +147,6 @@ // Implementation methods //------------------------------------------------------------------------- - protected void subscribe( SubscriptionList list ) throws ServletException { - for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) { - Subscription subscription = (Subscription) iter.next(); - subscribe( subscription ); - } - } - - protected void subscribe( Subscription subscription ) throws ServletException { - String name = subscription.getConnection(); - Messenger messenger = getMessenger( name ); - if ( messenger == null ) { - throw new ServletException( "No such Messenger called: " + name + " for subscription: " + subscription ); - } - MessageListener listener = null; - String servlet = subscription.getServlet(); - if ( servlet != null ) { - if ( USE_HTTP_SERVLETS ) { - listener = new MessageHttpServletDispatcher( servlet ); - } - else { - listener = new MessageServletDispatcher( servlet ); - } - } - else { - listener = subscription.getMessageListener(); - if ( listener == null ) { - throw new ServletException( "No MessageListener is defined for subscription: " + subscription ); - } - } - - // if its an MDO the initialise it! - if ( listener instanceof MessageDrivenObject ) { - MessageDrivenObject mdo = (MessageDrivenObject) listener; - if ( mdo instanceof MessengerMDO ) { - MessengerMDO messengerMDO = (MessengerMDO) mdo; - messengerMDO.setMessenger( messenger ); - messengerMDO.setMessengerManager( getMessengerManager() ); - } - mdo.init( getServletContext() ); - } - - String subject = subscription.getSubject(); - if ( subject == null || subject.length() == 0 ) { - throw new ServletException( "No destination defined for subscription: " + subscription ); - } - - Destination destination = null; - try { - destination = messenger.getDestination( subject ); - } - catch (JMSException e) { - handleJMSException( "Could not create destination for name: " + subject + " for subscription: " + subscription, e ); - } - if ( destination == null ) { - throw new ServletException( "No destination could be found for name: " + subject + " for subscription: " + subscription ); - } - - // #### at this point we may wish to create a thread pool of multiple threads - // #### each consuming from the same Destination in parallel - - try { - String selector = subscription.getSelector(); - if ( selector != null && selector.length() > 0 ) { - log( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector ); - - messenger.addListener( destination, selector, listener ); - } - else { - log( "Subscribing to messenger: " + name + " destination: " + subject ); - - messenger.addListener( destination, listener ); - } - } - catch (JMSException e) { - handleJMSException( "Could not subscribe to destination:" + destination + " for subscription: " + subscription, e ); - } - } - - /** Destrorys all current MBOs in this web application */ - protected void destroyMBOs() throws ServletException { - SubscriptionList list = getSubscriptionList(); - if ( list != null ) { - for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) { - Subscription subscription = (Subscription) iter.next(); - destroyMBOs( subscription ); - } - } - } - - protected void destroyMBOs( Subscription subscription ) throws ServletException { - // lets unsubscribe first - String name = subscription.getConnection(); - Messenger messenger = getMessenger( name ); - MessageListener listener = subscription.getMessageListener(); - if ( messenger != null && listener != null ) { - Destination destination = null; - String subject = subscription.getSubject(); - if ( subject == null || subject.length() == 0 ) { - log( "No destination defined for subscription: " + subscription ); - } - else { - try { - destination = messenger.getDestination( subject ); - if ( destination == null ) { - log( "No destination could be found for name: " + subject + " for subscription: " + subscription ); - } - } - catch (JMSException e) { - log( "Could not create destination for name: " + subject + " for subscription: " + subscription, e ); - } - } - if ( destination != null ) { - try { - String selector = subscription.getSelector(); - if ( selector != null && selector.length() > 0 ) { - messenger.removeListener( destination, selector, listener ); - } - else { - messenger.removeListener( destination, listener ); - } - } - catch (JMSException e) { - log( "Could not unsubscribe to destination:" + destination + " for subscription: " + subscription, e ); - } - } - } - - // now lets destrory the MBO - if ( listener instanceof MessageDrivenObject ) { - MessageDrivenObject mdo = (MessageDrivenObject) listener; - mdo.destroy(); - } - } - protected MessengerManager createMessengerManager() throws ServletException { String config = getURLResource( KEY_CONNECTIONS, "The Messenger connections XML deployment document" ); 1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/SubscriptionManager.java Index: SubscriptionManager.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. * * $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $ */ package org.apache.commons.messagelet; import java.util.Iterator; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.JMSException; import javax.servlet.ServletContext; import javax.servlet.ServletException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.messagelet.model.Subscription; import org.apache.commons.messagelet.model.SubscriptionList; import org.apache.commons.messenger.Messenger; import org.apache.commons.messenger.MessengerManager; import org.apache.commons.messenger.tool.StopWatchMessageListener; /** * <p><code>SubscriptionManager</code> is a simple command line program that will * create a number of subscriptions and consume messages using just regular * MDO and MessageListener classes. * * @author <a href="mailto:jstrachan@;apache.org">James Strachan</a> * @version $Revision: 1.12 $ */ public class SubscriptionManager { /** Logger */ private static final Log log = LogFactory.getLog(SubscriptionManager.class); /** The JMS connections */ private MessengerManager manager; /** The JMS Subscriptions */ private SubscriptionList subscriptionList; /** The context passed into MDOs */ private ServletContext servletContext; /** Should we use a stopwatch to output performance metrics */ private boolean useStopWatch = false; public SubscriptionManager() { } protected void subscribe() throws JMSException, ServletException { for (Iterator iter = getSubscriptionList().getSubscriptions().iterator(); iter.hasNext(); ) { Subscription subscription = (Subscription) iter.next(); subscribe( subscription ); } } public void subscribe( Subscription subscription ) throws JMSException, ServletException{ String name = subscription.getConnection(); Messenger messenger = getMessenger( name ); if ( messenger == null ) { throw new JMSException( "No such Messenger called: " + name + " for subscription: " + subscription ); } String subject = subscription.getSubject(); if ( subject == null || subject.length() == 0 ) { throw new JMSException( "No destination defined for subscription: " + subscription ); } Destination destination = messenger.getDestination( subject ); if ( destination == null ) { throw new JMSException( "No destination could be found for name: " + subject + " for subscription: " + subscription ); } MessageListener listener = subscription.getMessageListener(); if ( listener == null ) { throw new JMSException( "No MessageListener is defined for subscription: " + subscription ); } // if its an MDO the initialise it! if ( listener instanceof MessageDrivenObject ) { MessageDrivenObject mdo = (MessageDrivenObject) listener; if ( mdo instanceof MessengerMDO ) { MessengerMDO messengerMDO = (MessengerMDO) mdo; messengerMDO.setMessenger( messenger ); messengerMDO.setMessengerManager( getMessengerManager() ); } mdo.init( getServletContext() ); } listener = wrapInStopWatch( listener ); String selector = subscription.getSelector(); ConsumerThread thread = subscription.getConsumerThread(); if (thread != null) { log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector + " with: " + thread ); thread.setMessenger(messenger); thread.setDestination(destination); thread.setSelector(selector); thread.setListener(listener); thread.start(); } else { if ( selector != null && selector.length() > 0 ) { log.info( "Subscribing to messenger: " + name + " destination: " + subject + " selector: " + selector ); messenger.addListener( destination, selector, listener ); } else { log.info( "Subscribing to messenger: " + name + " destination: " + subject ); messenger.addListener( destination, listener ); } log.info( "Subscribed with listener: " + listener ); } } public void unsubscribe() throws JMSException, ServletException { SubscriptionList list = getSubscriptionList(); if ( list != null ) { for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) { Subscription subscription = (Subscription) iter.next(); unsubscribe( subscription ); } } } public void unsubscribe( Subscription subscription ) throws JMSException, ServletException { // lets unsubscribe first String name = subscription.getConnection(); Messenger messenger = getMessenger( name ); MessageListener listener = subscription.getMessageListener(); if ( messenger != null && listener != null ) { Destination destination = null; String subject = subscription.getSubject(); if ( subject == null || subject.length() == 0 ) { log.error( "No destination defined for subscription: " + subscription ); } else { try { destination = messenger.getDestination( subject ); if ( destination == null ) { log.error( "No destination could be found for name: " + subject + " for subscription: " + subscription ); } } catch (JMSException e) { log.error( "Could not create destination for name: " + subject + " for subscription: " + subscription, e ); } } if ( destination != null ) { try { String selector = subscription.getSelector(); if ( selector != null && selector.length() > 0 ) { messenger.removeListener( destination, selector, listener ); } else { messenger.removeListener( destination, listener ); } } catch (JMSException e) { log.error( "Could not unsubscribe to destination:" + destination + " for subscription: " + subscription, e ); } } } // now lets destrory the MBO if ( listener instanceof MessageDrivenObject ) { MessageDrivenObject mdo = (MessageDrivenObject) listener; mdo.destroy(); } } // Properties //------------------------------------------------------------------------- public MessengerManager getMessengerManager() throws JMSException { return manager; } public void setMessengerManager(MessengerManager manager) { this.manager = manager; } /** * Returns the subscriptionList. * @return SubscriptionList */ public SubscriptionList getSubscriptionList() { return subscriptionList; } /** * Sets the subscriptionList. * @param subscriptionList The subscriptionList to set */ public void setSubscriptionList(SubscriptionList subscriptionList) { this.subscriptionList = subscriptionList; } /** * Returns the servletContext. * @return ServletContext */ public ServletContext getServletContext() { return servletContext; } /** * Sets the servletContext. * @param servletContext The servletContext to set */ public void setServletContext(ServletContext servletContext) { this.servletContext = servletContext; } // Implementation methods //------------------------------------------------------------------------- /** * Allows the MessageListener to be wrapped inside a stop watch message listener if required */ protected MessageListener wrapInStopWatch( MessageListener listener ) { if ( useStopWatch ) { return new StopWatchMessageListener( listener ); } return listener; } protected Messenger getMessenger(String name) throws JMSException { return getMessengerManager().getMessenger( name ); } } 1.1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ConsumerThread.java Index: ConsumerThread.java =================================================================== /* * Copyright (C) The Apache Software Foundation. All rights reserved. * * This software is published under the terms of the Apache Software License * version 1.1, a copy of which has been included with this distribution in * the LICENSE file. * * $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $ */ package org.apache.commons.messagelet; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.JMSException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.messenger.Messenger; /** * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS messages * using a receive() method on Messenger and then process the message. * This class is a good base class when implementing some kind of transactional processing of * JMS messages * * @author <a href="mailto:jstrachan@;apache.org">James Strachan</a> * @version $Revision: 1.12 $ */ public class ConsumerThread extends Thread { /** Logger */ private static final Log log = LogFactory.getLog(ConsumerThread.class); private MessageConsumer consumer; private Messenger messenger; private Destination destination; private String selector; private MessageListener listener; private boolean shouldStop; public ConsumerThread() { setName("Consumer" + getName()); } /** * Starts all the JMS connections and consumes JMS messages, * passing them onto the MessageListener and Message Driven Objects */ public void run() { if (log.isDebugEnabled()) { log.debug( "Starting consumer thread: " + getName()); } try { startConsumer(); } catch (JMSException e) { log.error("Failed to start consumer thread: " + e, e); setShouldStop(true); } while (! isShouldStop()) { startTransaction(); try { Message message = receive(); if (log.isTraceEnabled()) { log.trace( "Found: " + message ); } if (message != null) { processMessage(message); commitTransaction(); } else { cancelTransaction(); } } catch (Exception e) { rollbackTransaction(e); } } try { stopConsumer(); } catch (JMSException e) { log.error("Failed to stop consuming messages: " + e, e); } } // Properties //------------------------------------------------------------------------- /** * Returns the destination. * @return Destination */ public Destination getDestination() { return destination; } /** * Returns the listener. * @return MessageListener */ public MessageListener getListener() { return listener; } /** * Returns the messenger. * @return Messenger */ public Messenger getMessenger() { return messenger; } /** * Returns the selector. * @return String */ public String getSelector() { return selector; } /** * Returns the shouldStop. * @return boolean */ public boolean isShouldStop() { return shouldStop; } /** * Sets the destination. * @param destination The destination to set */ public void setDestination(Destination destination) { this.destination = destination; } /** * Sets the listener. * @param listener The listener to set */ public void setListener(MessageListener listener) { this.listener = listener; } /** * Sets the messenger. * @param messenger The messenger to set */ public void setMessenger(Messenger messenger) { this.messenger = messenger; } /** * Sets the selector. * @param selector The selector to set */ public void setSelector(String selector) { this.selector = selector; } /** * Sets the shouldStop. * @param shouldStop The shouldStop to set */ public void setShouldStop(boolean shouldStop) { this.shouldStop = shouldStop; } // Implementation methods //------------------------------------------------------------------------- /** * Starts consuming messages */ protected void startConsumer() throws JMSException { consumer = createConsumer(); } /** * Stops consuming messages */ protected void stopConsumer() throws JMSException { consumer.close(); } /** * Factory method to create a new MessageConsumer */ protected MessageConsumer createConsumer() throws JMSException { String selector = getSelector(); if (selector != null) { return getMessenger().createConsumer(getDestination(), selector); } else { return getMessenger().createConsumer(getDestination()); } } /** * Strategy method to consume a message using a receive() kind of method. * @return the message or null if a message could not be found after waiting for * some period of time. */ private Message receive() throws JMSException { return getConsumer().receive(); } /** * Strategy method to process a given message. * By default this will just invoke the MessageListener */ protected void processMessage(Message message) throws JMSException { MessageListener listener = getListener(); if (listener != null) { listener.onMessage(message); } } /** * Strategy method to represent the code required to start * a transaction. */ protected void startTransaction() { } /** * Strategy method to represent the code required to commit * a transaction. */ protected void commitTransaction() throws Exception { } /** * Strategy method to represent the code required to rollback * a transaction. */ protected void rollbackTransaction(Exception e) { } /** * Strategy method to represent the code required to cancel * a transaction. * This is called when a message is not received. */ protected void cancelTransaction() throws Exception { } /** * @erturn the consumer of messages */ protected MessageConsumer getConsumer() { return consumer; } } 1.2 +10 -0 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/SubscriptionDigester.java Index: SubscriptionDigester.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/SubscriptionDigester.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- SubscriptionDigester.java 14 Aug 2002 14:36:12 -0000 1.1 +++ SubscriptionDigester.java 21 Oct 2002 20:31:28 -0000 1.2 @@ -26,6 +26,7 @@ private String bridgeClass = "org.apache.commons.messagelet.BridgeMDO"; private String distributeBridgeClass = "org.apache.commons.messagelet.DistributeBridgeMDO"; private String stopWatchClass = "org.apache.commons.messenger.tool.StopWatchMessageListener"; + private String consumerThreadClass = "org.apache.commons.messagelet.ConsumerThread"; public SubscriptionDigester() { } @@ -58,12 +59,21 @@ addCallMethod( "subscriptions/subscription/servlet", "setServlet", 0); + path = "subscriptions/subscription/consumerThread"; + addObjectCreate( path, consumerThreadClass, "className" ); + addSetProperties( path ); + addSetNext( path, "setConsumerThread", + consumerThreadClass + ); + path = "subscriptions/subscription/listener"; addObjectCreate( path, listenerClass, "className" ); addSetProperties( path ); addSetNext( path, "setMessageListener", "javax.jms.MessageListener" ); + + path = "subscriptions/subscription/stopWatch"; addObjectCreate( path, stopWatchClass, "className" ); 1.2 +22 -1 jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/Subscription.java Index: Subscription.java =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/Subscription.java,v retrieving revision 1.1 retrieving revision 1.2 diff -u -r1.1 -r1.2 --- Subscription.java 14 Aug 2002 14:36:12 -0000 1.1 +++ Subscription.java 21 Oct 2002 20:31:28 -0000 1.2 @@ -13,6 +13,8 @@ import javax.jms.JMSException; import javax.jms.MessageListener; +import org.apache.commons.messagelet.ConsumerThread; + import org.apache.commons.messenger.Messenger; import org.apache.commons.messenger.MessengerManager; @@ -38,6 +40,9 @@ /** Holds value of property servlet. */ private String servlet; + + /** should a ConsumerThread be used to consume these messages */ + private ConsumerThread consumerThread; public Subscription() { } @@ -114,6 +119,22 @@ } + /** + * Returns the consumerThread. + * @return ConsumerThread + */ + public ConsumerThread getConsumerThread() { + return consumerThread; + } + + /** + * Sets the consumerThread. + * @param consumerThread The consumerThread to set + */ + public void setConsumerThread(ConsumerThread consumerThread) { + this.consumerThread = consumerThread; + } + /** Outputs a debugging string */ public String toString() { StringBuffer buffer = new StringBuffer( super.toString() ); @@ -135,5 +156,5 @@ } return buffer.toString(); } - + } 1.4 +2 -2 jakarta-commons-sandbox/messenger/maven.xml Index: maven.xml =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/maven.xml,v retrieving revision 1.3 retrieving revision 1.4 diff -u -r1.3 -r1.4 --- maven.xml 7 Oct 2002 10:38:23 -0000 1.3 +++ maven.xml 21 Oct 2002 20:31:28 -0000 1.4 @@ -36,13 +36,13 @@ </java> </goal> - <goal name="demo.send" prereqs="set.classpath" + <goal name="demo:send" prereqs="set.classpath" description="Sends a message for processing by an MDO"> <java classname="org.apache.commons.messenger.tool.Producer" fork="yes"> <classpath refid="jms.classpath"/> <arg value="queue"/> - <arg value="echo.queue"/> + <arg value="echo.queue2"/> <arg value="src/conf/sampleMessage.txt"/> <sysproperty key="org.apache.commons.messenger" value="${messenger.xml}"/> </java> 1.10 +5 -0 jakarta-commons-sandbox/messenger/src/conf/subscribe.xml Index: subscribe.xml =================================================================== RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/conf/subscribe.xml,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- subscribe.xml 7 Oct 2002 10:38:23 -0000 1.9 +++ subscribe.xml 21 Oct 2002 20:31:28 -0000 1.10 @@ -6,6 +6,11 @@ <listener className="org.apache.commons.messenger.LoggingMDO"/> </subscription> + <subscription connection="queue" subject="echo.queue2"> + <consumerThread/> + <listener className="org.apache.commons.messenger.LoggingMDO"/> + </subscription> + <!-- bridge to another JMS provider --> <subscription connection="queue" subject="my.input2" selector="b='12'"> <bridge outputConnection="queue" outputSubject="my.output"/>
-- To unsubscribe, e-mail: <mailto:commons-dev-unsubscribe@;jakarta.apache.org> For additional commands, e-mail: <mailto:commons-dev-help@;jakarta.apache.org>