jstrachan    2002/10/21 13:31:28

  Modified:    messenger/src/java/org/apache/commons/messagelet Main.java
                        ManagerServlet.java
               messenger/src/java/org/apache/commons/messagelet/model
                        SubscriptionDigester.java Subscription.java
               messenger maven.xml
               messenger/src/conf subscribe.xml
  Added:       messenger/src/java/org/apache/commons/messagelet
                        SubscriptionManager.java ConsumerThread.java
  Log:
  Refactored the subscription mechanism into a reusable SubscriptionManager. This 
allows code to be shared between the stand alone messagelet engine and the 
ManagerServlet.
  
  Also added an option for a <consumerThread> which allows for a plugin point for 
performing transactional message consumption
  
  Revision  Changes    Path
  1.6       +5 -73     
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/Main.java
  
  Index: Main.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/Main.java,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- Main.java 7 Oct 2002 10:38:23 -0000       1.5
  +++ Main.java 21 Oct 2002 20:31:27 -0000      1.6
  @@ -92,11 +92,12 @@
       public void run() throws Exception {
   
           // force lazy construction
  -        getMessengerManager();
  +        SubscriptionManager subscriber = new SubscriptionManager();
  +        subscriber.setMessengerManager( getMessengerManager() );
  +        subscriber.setSubscriptionList( createSubscriptionList() );
  +        subscriber.setServletContext( getServletContext() );
           
  -        // load the subscriptions....
  -        subscriptionList = createSubscriptionList();
  -        subscribe( subscriptionList );
  +        subscriber.subscribe();
           
           // now lets start all the connections...
           for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); ) {
  @@ -152,65 +153,6 @@
       
       // Implementation methods
       //-------------------------------------------------------------------------    
  -    protected void subscribe( SubscriptionList list ) throws JMSException, 
ServletException {
  -        for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) {
  -            Subscription subscription = (Subscription) iter.next();
  -            subscribe( subscription );
  -        }
  -    }
  -    
  -    protected void subscribe( Subscription subscription ) throws JMSException, 
ServletException {
  -        String name = subscription.getConnection();
  -        Messenger messenger = getMessenger( name );
  -        if ( messenger == null ) {
  -            throw new JMSException( "No such Messenger called: " + name + " for 
subscription: " + subscription );
  -        }
  -        MessageListener listener = subscription.getMessageListener();
  -        if ( listener == null ) {
  -            throw new JMSException( "No MessageListener is defined for 
subscription: " + subscription );
  -        }
  -        
  -        // if its an MDO the initialise it!
  -        if ( listener instanceof MessageDrivenObject ) {
  -            MessageDrivenObject mdo = (MessageDrivenObject) listener;
  -            if ( mdo instanceof MessengerMDO ) {
  -                MessengerMDO messengerMDO = (MessengerMDO) mdo;
  -                messengerMDO.setMessenger( messenger );
  -                messengerMDO.setMessengerManager( getMessengerManager() );
  -            }
  -            mdo.init( getServletContext() );
  -        }
  -        
  -        listener = wrapInStopWatch( listener );
  -        
  -        String subject = subscription.getSubject();
  -        if ( subject == null || subject.length() == 0 ) {
  -            throw new JMSException( "No destination defined for subscription: " + 
subscription );
  -        }
  -        
  -        Destination destination = messenger.getDestination( subject );
  -        if ( destination == null ) {
  -            throw new JMSException( "No destination could be found for name: " + 
subject + " for subscription: " + subscription );
  -        }
  -
  -        // #### at this point we may wish to create a thread pool of multiple 
threads
  -        // #### each consuming from the same Destination in parallel
  -        
  -        String selector = subscription.getSelector();
  -        if ( selector != null && selector.length() > 0 ) {
  -            log.info( "Subscribing to messenger: " + name + " destination: " + 
subject + " selector: " + selector );
  -            
  -            messenger.addListener( destination, selector, listener );
  -        }
  -        else {
  -            log.info( "Subscribing to messenger: " + name + " destination: " + 
subject );
  -            
  -            messenger.addListener( destination, listener );
  -        }
  -        
  -        log.info( "Subscribed with listener: " + listener );
  -    }
  -    
       protected MessengerManager createMessengerManager() throws JMSException {
           String config = connectionsConfig;
           
  @@ -252,16 +194,6 @@
           return null;
       }
       
  -    /**
  -     * Allows the MessageListener to be wrapped inside a stop watch message 
listener if required 
  -     */
  -    protected MessageListener wrapInStopWatch( MessageListener listener ) {
  -        if ( useStopWatch ) {
  -            return new StopWatchMessageListener( listener );
  -        }
  -        return listener;
  -    }
  -        
       /**
        * This method blocks the current thread indefinitely until the JVM is 
terminated.
        */
  
  
  
  1.14      +51 -200   
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ManagerServlet.java
  
  Index: ManagerServlet.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ManagerServlet.java,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -r1.13 -r1.14
  --- ManagerServlet.java       13 Aug 2002 07:19:10 -0000      1.13
  +++ ManagerServlet.java       21 Oct 2002 20:31:27 -0000      1.14
  @@ -40,14 +40,12 @@
     */
   public class ManagerServlet extends GenericServlet {
   
  -    private static final String KEY_MESSENGER_MANAGER = 
MessengerManager.class.getName();
  -    private static final String KEY_SUBSCRIPTIONLIST = 
SubscriptionList.class.getName();
  -    private static final String KEY_CONNECTIONS = "connections";
  -    private static final String KEY_SUBSCRIPTIONS = "subscriptions";
  -
       /** Should HTTP servlets be used or generic servlets. If true then JSP can be 
dispatched to easily */
       private static final boolean USE_HTTP_SERVLETS = true;
   
  +    private static final String KEY_CONNECTIONS = "connections";
  +    private static final String KEY_SUBSCRIPTIONS = "subscriptions";
  +
       /** 
        * Whether exceptions occurring during subscriptions on startup should 
        * terminate the initialization
  @@ -57,40 +55,15 @@
       public ManagerServlet() {
       }
       
  -    public MessengerManager getMessengerManager() {
  -        return (MessengerManager) getServletContext().getAttribute( 
KEY_MESSENGER_MANAGER );
  -    }
  -    
  -    public void setMessengerManager(MessengerManager messengerManager) {
  -        if ( messengerManager == null ) {
  -            getServletContext().removeAttribute( KEY_MESSENGER_MANAGER );
  -        }
  -        else {
  -            getServletContext().setAttribute( KEY_MESSENGER_MANAGER, 
messengerManager );
  +    public SubscriptionManager getSubscriptionManager() {
  +        SubscriptionManager answer = (SubscriptionManager) 
getServletContext().getAttribute( "subscriptionManager" );
  +        if (answer == null) {
  +            answer = new SubscriptionManager();
  +            getServletContext().setAttribute( "subscriptionManager", answer );
           }
  +        return answer;
       }
  -    
  -    public SubscriptionList getSubscriptionList() {
  -        return (SubscriptionList) getServletContext().getAttribute( 
KEY_SUBSCRIPTIONLIST );
  -    }
  -    
  -    public void setSubscriptionList(SubscriptionList subscriptionList) {
  -        if ( subscriptionList == null ) {
  -            getServletContext().removeAttribute( KEY_SUBSCRIPTIONLIST );
  -        }
  -        else {
  -            getServletContext().setAttribute( KEY_SUBSCRIPTIONLIST, 
subscriptionList );
  -        }
  -    }
  -    
  -    public Messenger getMessenger(String name) throws ServletException {
  -        MessengerManager messengerManager = getMessengerManager();
  -        if ( messengerManager == null ) {
  -            throw new ServletException( "No MessengerManager has been initialized 
yet" );
  -        }
  -        return messengerManager.getMessenger( name );
  -    }
  -    
  +
       // Servlet methods
       //-------------------------------------------------------------------------    
       
  @@ -101,44 +74,56 @@
           }
           
           // ensure Messenger is initialised
  -        MessengerManager manager = getMessengerManager();
  -        if ( manager == null ) {
  -            manager = createMessengerManager();
  -            setMessengerManager( manager );
  -        
  -            // load the subscriptions....
  -            SubscriptionList list = createSubscriptionList();
  -            subscribe( list );
  -            setSubscriptionList( list );
  -            
  -            // now lets start all the connections...
  -            for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); ) {
  -                String name = (String) iter.next();
  -                Messenger messenger = manager.getMessenger( name );
  -                try {
  -                    messenger.getConnection().start();
  -                }
  -                catch (JMSException e) {
  -                    log( "Caught exception trying to start messenger: " + name + ". 
Exception: " + e, e );
  +        try {
  +            SubscriptionManager subscriber = getSubscriptionManager();
  +            MessengerManager manager = subscriber.getMessengerManager();
  +            if ( manager == null ) {
  +                manager = createMessengerManager();
  +    
  +                subscriber.setMessengerManager( manager );
  +                subscriber.setSubscriptionList( createSubscriptionList() );
  +                subscriber.setServletContext( getServletContext() );
  +                
  +                // load the subscriptions....
  +                subscriber.subscribe();
  +                
  +                // now lets start all the connections...
  +                for (Iterator iter = manager.getMessengerNames(); iter.hasNext(); ) 
{
  +                    String name = (String) iter.next();
  +                    Messenger messenger = manager.getMessenger( name );
  +                    try {
  +                        messenger.getConnection().start();
  +                    }
  +                    catch (JMSException e) {
  +                        log( "Caught exception trying to start messenger: " + name 
+ ". Exception: " + e, e );
  +                    }
                   }
               }
           }
  +        catch (JMSException e) {
  +            throw new ServletException("Failed to initialize: " + e, e );
  +        }
       }
       
       public void destroy() {
           try {
  -            destroyMBOs();
  +            getSubscriptionManager().unsubscribe();
           }
  -        catch (ServletException e) {
  +        catch (Exception e) {
               log( "Failed to destrory the MBOs: " + e, e );
           }
  -        
  -        MessengerManager manager = getMessengerManager();
  -        if ( manager != null ) {
  -            log( "Closing the Messenger connections" );
  -            manager.close();
  +
  +        try {        
  +            MessengerManager manager = 
getSubscriptionManager().getMessengerManager();
  +            if ( manager != null ) {
  +                log( "Closing the Messenger connections" );
  +                manager.close();
  +            }
           }
  -        setMessengerManager( null );
  +        catch (Exception e) {
  +            log( "Failed to close the Messenger Manager: " + e, e );
  +        }
  +        getSubscriptionManager().setMessengerManager( null );
       }
       
       public void service(ServletRequest request, ServletResponse response) throws 
ServletException {
  @@ -162,140 +147,6 @@
       
       // Implementation methods
       //-------------------------------------------------------------------------    
  -    protected void subscribe( SubscriptionList list ) throws ServletException {
  -        for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); ) {
  -            Subscription subscription = (Subscription) iter.next();
  -            subscribe( subscription );
  -        }
  -    }
  -    
  -    protected void subscribe( Subscription subscription ) throws ServletException {
  -        String name = subscription.getConnection();
  -        Messenger messenger = getMessenger( name );
  -        if ( messenger == null ) {
  -            throw new ServletException( "No such Messenger called: " + name + " for 
subscription: " + subscription );
  -        }
  -        MessageListener listener = null;
  -        String servlet = subscription.getServlet();
  -        if ( servlet != null ) {
  -            if ( USE_HTTP_SERVLETS ) {
  -                listener = new MessageHttpServletDispatcher( servlet );
  -            }
  -            else {
  -                listener = new MessageServletDispatcher( servlet );
  -            }
  -        }
  -        else {
  -            listener = subscription.getMessageListener();
  -            if ( listener == null ) {
  -                throw new ServletException( "No MessageListener is defined for 
subscription: " + subscription );
  -            }
  -        }
  -        
  -        // if its an MDO the initialise it!
  -        if ( listener instanceof MessageDrivenObject ) {
  -            MessageDrivenObject mdo = (MessageDrivenObject) listener;
  -            if ( mdo instanceof MessengerMDO ) {
  -                MessengerMDO messengerMDO = (MessengerMDO) mdo;
  -                messengerMDO.setMessenger( messenger );
  -                messengerMDO.setMessengerManager( getMessengerManager() );
  -            }
  -            mdo.init( getServletContext() );
  -        }
  -        
  -        String subject = subscription.getSubject();
  -        if ( subject == null || subject.length() == 0 ) {
  -            throw new ServletException( "No destination defined for subscription: " 
+ subscription );
  -        }
  -        
  -        Destination destination = null;        
  -        try {
  -            destination = messenger.getDestination( subject );
  -        }
  -        catch (JMSException e) {
  -            handleJMSException( "Could not create destination for name: " + subject 
+ " for subscription: " + subscription, e );
  -        }
  -        if ( destination == null ) {
  -            throw new ServletException( "No destination could be found for name: " 
+ subject + " for subscription: " + subscription );
  -        }
  -
  -        // #### at this point we may wish to create a thread pool of multiple 
threads
  -        // #### each consuming from the same Destination in parallel
  -        
  -        try {
  -            String selector = subscription.getSelector();
  -            if ( selector != null && selector.length() > 0 ) {
  -                log( "Subscribing to messenger: " + name + " destination: " + 
subject + " selector: " + selector );
  -                
  -                messenger.addListener( destination, selector, listener );
  -            }
  -            else {
  -                log( "Subscribing to messenger: " + name + " destination: " + 
subject );
  -                
  -                messenger.addListener( destination, listener );
  -            }
  -        }
  -        catch (JMSException e) {
  -            handleJMSException( "Could not subscribe to destination:" + destination 
+ " for subscription: " + subscription, e );
  -        }
  -    }
  -    
  -    /** Destrorys all current MBOs in this web application */
  -    protected void destroyMBOs() throws ServletException {
  -        SubscriptionList list = getSubscriptionList();
  -        if ( list != null ) {
  -            for (Iterator iter = list.getSubscriptions().iterator(); 
iter.hasNext(); ) {
  -                Subscription subscription = (Subscription) iter.next();
  -                destroyMBOs( subscription );
  -            }
  -        }
  -    }
  -    
  -    protected void destroyMBOs( Subscription subscription ) throws ServletException 
{
  -        // lets unsubscribe first
  -        String name = subscription.getConnection();
  -        Messenger messenger = getMessenger( name );
  -        MessageListener listener = subscription.getMessageListener();
  -        if ( messenger != null && listener != null ) {
  -            Destination destination = null;        
  -            String subject = subscription.getSubject();
  -            if ( subject == null || subject.length() == 0 ) {
  -                log( "No destination defined for subscription: " + subscription );
  -            }
  -            else {
  -                try {
  -                    destination = messenger.getDestination( subject );
  -                    if ( destination == null ) {
  -                        log( "No destination could be found for name: " + subject + 
" for subscription: " + subscription );
  -                    }
  -                }
  -                catch (JMSException e) {
  -                    log( "Could not create destination for name: " + subject + " 
for subscription: " + subscription, e );
  -                }
  -            }
  -            if ( destination != null ) {
  -                try {
  -                    String selector = subscription.getSelector();
  -                    if ( selector != null && selector.length() > 0 ) {
  -                        messenger.removeListener( destination, selector, listener );
  -                    }
  -                    else {
  -                        messenger.removeListener( destination, listener );
  -                    }
  -                }
  -                catch (JMSException e) {
  -                    log( "Could not unsubscribe to destination:" + destination + " 
for subscription: " + subscription, e );
  -                }
  -            }
  -        }
  -        
  -        // now lets destrory the MBO
  -        if ( listener instanceof MessageDrivenObject ) {
  -            MessageDrivenObject mdo = (MessageDrivenObject) listener;
  -            mdo.destroy();
  -        }
  -    }
  -    
       protected MessengerManager createMessengerManager() throws ServletException {
           String config = getURLResource( KEY_CONNECTIONS, "The Messenger connections 
XML deployment document" );
   
  
  
  
  1.1                  
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/SubscriptionManager.java
  
  Index: SubscriptionManager.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   * 
   * $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $
   */
  package org.apache.commons.messagelet;
  
  import java.util.Iterator;
  
  import javax.jms.Destination;
  import javax.jms.Message;
  import javax.jms.MessageConsumer;
  import javax.jms.MessageListener;
  import javax.jms.JMSException;
  
  import javax.servlet.ServletContext;
  import javax.servlet.ServletException;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  import org.apache.commons.messagelet.model.Subscription;
  import org.apache.commons.messagelet.model.SubscriptionList;
  import org.apache.commons.messenger.Messenger;
  import org.apache.commons.messenger.MessengerManager;
  import org.apache.commons.messenger.tool.StopWatchMessageListener;
  
  /** 
   * <p><code>SubscriptionManager</code> is a simple command line program that will
   * create a number of subscriptions and consume messages using just regular 
   * MDO and MessageListener classes.
   *
   * @author <a href="mailto:jstrachan@;apache.org">James Strachan</a>
   * @version $Revision: 1.12 $
   */
  public class SubscriptionManager {
  
      /** Logger */
      private static final Log log = LogFactory.getLog(SubscriptionManager.class);
  
      /** The JMS connections */    
      private MessengerManager manager;
  
      /** The JMS Subscriptions */
      private SubscriptionList subscriptionList;
  
      /** The context passed into MDOs */
      private ServletContext servletContext;
          
      /** Should we use a stopwatch to output performance metrics */
      private boolean useStopWatch = false;
  
      
      public SubscriptionManager() {
      }
  
  
      protected void subscribe() throws JMSException, ServletException {
          for (Iterator iter = getSubscriptionList().getSubscriptions().iterator(); 
iter.hasNext(); ) {
              Subscription subscription = (Subscription) iter.next();
              subscribe( subscription );
          }
      }
      
      public void subscribe( Subscription subscription ) throws JMSException, 
ServletException{
          String name = subscription.getConnection();
          Messenger messenger = getMessenger( name );
          if ( messenger == null ) {
              throw new JMSException( "No such Messenger called: " + name + " for 
subscription: " + subscription );
          }
          String subject = subscription.getSubject();
          if ( subject == null || subject.length() == 0 ) {
              throw new JMSException( "No destination defined for subscription: " + 
subscription );
          }
          
          Destination destination = messenger.getDestination( subject );
          if ( destination == null ) {
              throw new JMSException( "No destination could be found for name: " + 
subject + " for subscription: " + subscription );
          }
  
          MessageListener listener = subscription.getMessageListener();
          if ( listener == null ) {
              throw new JMSException( "No MessageListener is defined for subscription: 
" + subscription );
          }
          
          // if its an MDO the initialise it!
          if ( listener instanceof MessageDrivenObject ) {
              MessageDrivenObject mdo = (MessageDrivenObject) listener;
              if ( mdo instanceof MessengerMDO ) {
                  MessengerMDO messengerMDO = (MessengerMDO) mdo;
                  messengerMDO.setMessenger( messenger );
                  messengerMDO.setMessengerManager( getMessengerManager() );
              }
              mdo.init( getServletContext() );
          }
  
          listener = wrapInStopWatch( listener );
  
          String selector = subscription.getSelector();
  
          ConsumerThread thread = subscription.getConsumerThread();
          if (thread != null) {
              log.info( "Subscribing to messenger: " + name + " destination: " + 
subject + " selector: " + selector + " with: " + thread );
              
              thread.setMessenger(messenger);
              thread.setDestination(destination);
              thread.setSelector(selector);
              thread.setListener(listener);
              thread.start();
          }
          else {
              if ( selector != null && selector.length() > 0 ) {
                  log.info( "Subscribing to messenger: " + name + " destination: " + 
subject + " selector: " + selector );
                  
                  messenger.addListener( destination, selector, listener );
              }
              else {
                  log.info( "Subscribing to messenger: " + name + " destination: " + 
subject );
                  
                  messenger.addListener( destination, listener );
              }
              
              log.info( "Subscribed with listener: " + listener );
          }
      }
  
  
      
      public void unsubscribe() throws JMSException, ServletException {
          SubscriptionList list = getSubscriptionList();
          if ( list != null ) {
              for (Iterator iter = list.getSubscriptions().iterator(); iter.hasNext(); 
) {
                  Subscription subscription = (Subscription) iter.next();
                  unsubscribe( subscription );
              }
          }
      }
      
      public void unsubscribe( Subscription subscription ) throws JMSException, 
ServletException {
          // lets unsubscribe first
          String name = subscription.getConnection();
          Messenger messenger = getMessenger( name );
          
          MessageListener listener = subscription.getMessageListener();
          if ( messenger != null && listener != null ) {
              Destination destination = null;        
              String subject = subscription.getSubject();
              if ( subject == null || subject.length() == 0 ) {
                  log.error( "No destination defined for subscription: " + 
subscription );
              }
              else {
                  try {
                      destination = messenger.getDestination( subject );
                      if ( destination == null ) {
                          log.error( "No destination could be found for name: " + 
subject + " for subscription: " + subscription );
                      }
                  }
                  catch (JMSException e) {
                      log.error( "Could not create destination for name: " + subject + 
" for subscription: " + subscription, e );
                  }
              }
              if ( destination != null ) {
                  try {
                      String selector = subscription.getSelector();
                      if ( selector != null && selector.length() > 0 ) {
                          messenger.removeListener( destination, selector, listener );
                      }
                      else {
                          messenger.removeListener( destination, listener );
                      }
                  }
                  catch (JMSException e) {
                      log.error( "Could not unsubscribe to destination:" + destination 
+ " for subscription: " + subscription, e );
                  }
              }
          }
          
          // now lets destrory the MBO
          if ( listener instanceof MessageDrivenObject ) {
              MessageDrivenObject mdo = (MessageDrivenObject) listener;
              mdo.destroy();
          }
      }
      
      
      // Properties
      //-------------------------------------------------------------------------    
  
      public MessengerManager getMessengerManager() throws JMSException {
          return manager;
      }
      
      public void setMessengerManager(MessengerManager manager) {
          this.manager = manager;
      }
  
      /**
       * Returns the subscriptionList.
       * @return SubscriptionList
       */
      public SubscriptionList getSubscriptionList() {
          return subscriptionList;
      }
  
      /**
       * Sets the subscriptionList.
       * @param subscriptionList The subscriptionList to set
       */
      public void setSubscriptionList(SubscriptionList subscriptionList) {
          this.subscriptionList = subscriptionList;
      }
      
      /**
       * Returns the servletContext.
       * @return ServletContext
       */
      public ServletContext getServletContext() {
          return servletContext;
      }
  
      /**
       * Sets the servletContext.
       * @param servletContext The servletContext to set
       */
      public void setServletContext(ServletContext servletContext) {
          this.servletContext = servletContext;
      }
      
      // Implementation methods
      //-------------------------------------------------------------------------    
      /**
       * Allows the MessageListener to be wrapped inside a stop watch message listener 
if required 
       */
      protected MessageListener wrapInStopWatch( MessageListener listener ) {
          if ( useStopWatch ) {
              return new StopWatchMessageListener( listener );
          }
          return listener;
      }
  
      protected Messenger getMessenger(String name) throws JMSException {
          return getMessengerManager().getMessenger( name );
      }
  
  }
  
  
  
  1.1                  
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/ConsumerThread.java
  
  Index: ConsumerThread.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   * 
   * $Id: ManagerServlet.java,v 1.12 2002/05/15 14:36:34 jstrachan Exp $
   */
  package org.apache.commons.messagelet;
  
  import javax.jms.Destination;
  import javax.jms.Message;
  import javax.jms.MessageConsumer;
  import javax.jms.MessageListener;
  import javax.jms.JMSException;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  
  import org.apache.commons.messenger.Messenger;
  
  /** 
   * <p><code>ConsumerThread</code> is a thread which will repeatedly consume JMS 
messages
   * using a receive() method on Messenger and then process the message.
   * This class is a good base class when implementing some kind of transactional 
processing of 
   * JMS messages
   *
   * @author <a href="mailto:jstrachan@;apache.org">James Strachan</a>
   * @version $Revision: 1.12 $
   */
  public class ConsumerThread extends Thread {
  
      /** Logger */
      private static final Log log = LogFactory.getLog(ConsumerThread.class);
  
  
      private MessageConsumer consumer;
      private Messenger messenger;
      private Destination destination;
      private String selector;
      private MessageListener listener;
      private boolean shouldStop;
      
      public ConsumerThread() {
          setName("Consumer" + getName());
      }
  
  
      /**
       * Starts all the JMS connections and consumes JMS messages, 
       * passing them onto the MessageListener and Message Driven Objects
       */
      public void run() {
          if (log.isDebugEnabled()) {
              log.debug( "Starting consumer thread: " + getName());
          }
          try {
              startConsumer();
          }
          catch (JMSException e) {
              log.error("Failed to start consumer thread: " + e, e);
              setShouldStop(true);
          }
          
          while (! isShouldStop()) {
              startTransaction();
  
              try {
                  Message message = receive();
  
                  if (log.isTraceEnabled()) {
                      log.trace( "Found: " + message );
                  }
                  
                  if (message != null) {
                      processMessage(message);
                      commitTransaction();
                  }
                  else {
                      cancelTransaction();
                  }
              }
              catch (Exception e) {
                  rollbackTransaction(e);
              }
          }
          
          try {
              stopConsumer();
          }
          catch (JMSException e) {
              log.error("Failed to stop consuming messages: " + e, e);
          }
      }
      
      // Properties
      //-------------------------------------------------------------------------    
  
      /**
       * Returns the destination.
       * @return Destination
       */
      public Destination getDestination() {
          return destination;
      }
  
      /**
       * Returns the listener.
       * @return MessageListener
       */
      public MessageListener getListener() {
          return listener;
      }
  
      /**
       * Returns the messenger.
       * @return Messenger
       */
      public Messenger getMessenger() {
          return messenger;
      }
  
      /**
       * Returns the selector.
       * @return String
       */
      public String getSelector() {
          return selector;
      }
  
      /**
       * Returns the shouldStop.
       * @return boolean
       */
      public boolean isShouldStop() {
          return shouldStop;
      }
  
      /**
       * Sets the destination.
       * @param destination The destination to set
       */
      public void setDestination(Destination destination) {
          this.destination = destination;
      }
  
      /**
       * Sets the listener.
       * @param listener The listener to set
       */
      public void setListener(MessageListener listener) {
          this.listener = listener;
      }
  
      /**
       * Sets the messenger.
       * @param messenger The messenger to set
       */
      public void setMessenger(Messenger messenger) {
          this.messenger = messenger;
      }
  
      /**
       * Sets the selector.
       * @param selector The selector to set
       */
      public void setSelector(String selector) {
          this.selector = selector;
      }
  
      /**
       * Sets the shouldStop.
       * @param shouldStop The shouldStop to set
       */
      public void setShouldStop(boolean shouldStop) {
          this.shouldStop = shouldStop;
      }
  
      // Implementation methods
      //-------------------------------------------------------------------------    
      
      /**
       * Starts consuming messages        
       */    
      protected void startConsumer() throws JMSException {
          consumer = createConsumer();
      }
  
      /**
       * Stops consuming messages        
       */    
      protected void stopConsumer() throws JMSException {
          consumer.close();
      }
  
      /**
       * Factory method to create a new MessageConsumer 
       */
      protected MessageConsumer createConsumer() throws JMSException {
          String selector = getSelector();
          if (selector != null) {
              return getMessenger().createConsumer(getDestination(), selector);
          }
          else {
              return getMessenger().createConsumer(getDestination());
          }
      }
  
      /**
       * Strategy method to consume a message using a receive() kind of method.
       * @return the message or null if a message could not be found after waiting for
       * some period of time.
       */
      private Message receive() throws JMSException {
          return getConsumer().receive();
      }    
      
      /**
       * Strategy method to process a given message. 
       * By default this will just invoke the MessageListener
       */
      protected void processMessage(Message message) throws JMSException {
          MessageListener listener = getListener();
          if (listener != null) {
              listener.onMessage(message);
          }
      }
  
  
      /**
       * Strategy method to represent the code required to start
       * a transaction.
       */
      protected void startTransaction() {
      }
  
      /**
       * Strategy method to represent the code required to commit
       * a transaction.
       */
      protected void commitTransaction() throws Exception {
      }
  
      /**
       * Strategy method to represent the code required to rollback
       * a transaction.
       */
      protected void rollbackTransaction(Exception e) {
      }
  
      /**
       * Strategy method to represent the code required to cancel
       * a transaction. 
       * This is called when a message is not received.
       */
      protected void cancelTransaction() throws Exception {
      }
  
  
      /**
       * @erturn the consumer of messages 
       */
      protected MessageConsumer getConsumer() {
          return consumer;
      }
  }
  
  
  
  1.2       +10 -0     
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/SubscriptionDigester.java
  
  Index: SubscriptionDigester.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/SubscriptionDigester.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- SubscriptionDigester.java 14 Aug 2002 14:36:12 -0000      1.1
  +++ SubscriptionDigester.java 21 Oct 2002 20:31:28 -0000      1.2
  @@ -26,6 +26,7 @@
       private String bridgeClass = "org.apache.commons.messagelet.BridgeMDO";
       private String distributeBridgeClass = 
"org.apache.commons.messagelet.DistributeBridgeMDO";
       private String stopWatchClass = 
"org.apache.commons.messenger.tool.StopWatchMessageListener";
  +    private String consumerThreadClass = 
"org.apache.commons.messagelet.ConsumerThread";
       
       public SubscriptionDigester() {
       }
  @@ -58,12 +59,21 @@
   
           addCallMethod( "subscriptions/subscription/servlet", "setServlet", 0);
           
  +        path = "subscriptions/subscription/consumerThread";
  +        addObjectCreate( path, consumerThreadClass, "className" );
  +        addSetProperties( path );
  +        addSetNext( path, "setConsumerThread",
  +           consumerThreadClass
  +        );
  +
           path = "subscriptions/subscription/listener";
           addObjectCreate( path, listenerClass, "className" );
           addSetProperties( path );
           addSetNext( path, "setMessageListener",
              "javax.jms.MessageListener"
           );
  +
  +
           
           path = "subscriptions/subscription/stopWatch";
           addObjectCreate( path, stopWatchClass, "className" );
  
  
  
  1.2       +22 -1     
jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/Subscription.java
  
  Index: Subscription.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-commons-sandbox/messenger/src/java/org/apache/commons/messagelet/model/Subscription.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- Subscription.java 14 Aug 2002 14:36:12 -0000      1.1
  +++ Subscription.java 21 Oct 2002 20:31:28 -0000      1.2
  @@ -13,6 +13,8 @@
   import javax.jms.JMSException;
   import javax.jms.MessageListener;
   
  +import org.apache.commons.messagelet.ConsumerThread;
  +
   import org.apache.commons.messenger.Messenger;
   import org.apache.commons.messenger.MessengerManager;
   
  @@ -38,6 +40,9 @@
       
       /** Holds value of property servlet. */
       private String servlet;
  +
  +    /** should a ConsumerThread be used to consume these messages */
  +    private ConsumerThread consumerThread;    
       
       public Subscription() {
       }        
  @@ -114,6 +119,22 @@
       }
       
       
  +    /**
  +     * Returns the consumerThread.
  +     * @return ConsumerThread
  +     */
  +    public ConsumerThread getConsumerThread() {
  +        return consumerThread;
  +    }
  +
  +    /**
  +     * Sets the consumerThread.
  +     * @param consumerThread The consumerThread to set
  +     */
  +    public void setConsumerThread(ConsumerThread consumerThread) {
  +        this.consumerThread = consumerThread;
  +    }
  +    
       /** Outputs a debugging string */
       public String toString() {
           StringBuffer buffer = new StringBuffer( super.toString() );
  @@ -135,5 +156,5 @@
           }
           return buffer.toString();
       }   
  -    
  +
   }
  
  
  
  1.4       +2 -2      jakarta-commons-sandbox/messenger/maven.xml
  
  Index: maven.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/maven.xml,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- maven.xml 7 Oct 2002 10:38:23 -0000       1.3
  +++ maven.xml 21 Oct 2002 20:31:28 -0000      1.4
  @@ -36,13 +36,13 @@
       </java>
     </goal>
        
  -  <goal name="demo.send" prereqs="set.classpath"
  +  <goal name="demo:send" prereqs="set.classpath"
       description="Sends a message for processing by an MDO">
       
       <java classname="org.apache.commons.messenger.tool.Producer" fork="yes">
         <classpath refid="jms.classpath"/>
         <arg value="queue"/>
  -      <arg value="echo.queue"/>
  +      <arg value="echo.queue2"/>
         <arg value="src/conf/sampleMessage.txt"/>
         <sysproperty key="org.apache.commons.messenger" value="${messenger.xml}"/>
       </java>
  
  
  
  1.10      +5 -0      jakarta-commons-sandbox/messenger/src/conf/subscribe.xml
  
  Index: subscribe.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-commons-sandbox/messenger/src/conf/subscribe.xml,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -r1.9 -r1.10
  --- subscribe.xml     7 Oct 2002 10:38:23 -0000       1.9
  +++ subscribe.xml     21 Oct 2002 20:31:28 -0000      1.10
  @@ -6,6 +6,11 @@
       <listener className="org.apache.commons.messenger.LoggingMDO"/>
     </subscription>
   
  +  <subscription connection="queue" subject="echo.queue2">
  +     <consumerThread/>
  +    <listener className="org.apache.commons.messenger.LoggingMDO"/>
  +  </subscription>
  +
     <!-- bridge to another JMS provider -->
     <subscription connection="queue" subject="my.input2" selector="b='12'">
       <bridge outputConnection="queue" outputSubject="my.output"/>
  
  
  

--
To unsubscribe, e-mail:   <mailto:commons-dev-unsubscribe@;jakarta.apache.org>
For additional commands, e-mail: <mailto:commons-dev-help@;jakarta.apache.org>

Reply via email to