Hi,

I was trying to tidy up the Ajax code so that consumers closed when
sessions timeout (or long polls stop coming).   But the queueConsumer
map in WebClient is static and key only by destination, which means:

 + You can only have one consumer per queue.    I can imagine Ajax
   apps that want to hand out messages to one of many clients and thus
   having multiple consumers would be a good way to do this.

 + As it stands, you don't know when the consumer is no longer needed.
   So it will live forever even if all sessions timeout.

I've reworked the code so that queue consumers are associated with
the httpSession (just as topic consumers) and they use the common 
jms session. unsubscribe now closes consumers as will long poll 
timeout and ending the session.

But I don't want to check it in, as I don't understand why the
consumers were static in the first place.  Some effort was put
into the static code and recovering the map from context 
attributes etc.  So I'd like to double check that I'm not
missing something?

I've attached a patch and would appreciate any comments.

cheers




Index: activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
===================================================================
--- activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java	(revision 388492)
+++ activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java	(working copy)
@@ -55,6 +55,14 @@
  * specify a readTimeout parameter to determine how long the servlet should
  * block for.
  * 
+ * The servlet can be configured with the following init parameters:<dl>
+ * <dt>defaultReadTimeout</dt><dd>The default time in ms to wait for messages. May be overridden by a request using the 'timeout' parameter</dd>
+ * <dt>maximumReadTimeout</dt><dd>The maximum value a request may specify for the 'timeout' parameter</dd>
+ * <dt>maximumMessages</dt><dd>maximum messages to send per response</dd>
+ * <dt></dt><dd></dd>
+ * </dl>
+ *  
+ * 
  * @version $Revision: 1.1.1.1 $
  */
 public class MessageListenerServlet extends MessageServletSupport {
@@ -132,6 +140,7 @@
                     {
                         Listener listener = getListener(request);
                         Map consumerIdMap = getConsumerIdMap(request);
+                        client.closeConsumer(destination); // drop any existing consumer.
                         MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
                         
                         consumer.setAvailableListener(listener);
@@ -145,9 +154,9 @@
                         Map consumerIdMap = getConsumerIdMap(request);
                         MessageAvailableConsumer consumer = (MessageAvailableConsumer) client.getConsumer(destination);
                         
-                        // TODO should we destroy consumer on unsubscribe?
                         consumer.setAvailableListener(null);
                         consumerIdMap.remove(consumer);
+                        client.closeConsumer(destination);
                         if (log.isDebugEnabled()) {
                             log.debug("Unsubscribed: "+consumer);
                         }
@@ -189,7 +198,6 @@
             response.setHeader("Cache-Control", "no-cache");
             response.getWriter().print("<ajax-response></ajax-response>");
         }
-        // System.err.println("==");
     }
 
     /**
@@ -209,7 +217,6 @@
         catch (JMSException e) {
             throw new ServletException("JMS problem: " + e, e);
         }
-        // System.err.println("--");
     }
 
     /**
@@ -232,11 +239,12 @@
             log.debug("doMessage timeout="+timeout);
         }
         
-        Continuation continuation = null;
-        Message message = null;
-
+        Continuation continuation = ContinuationSupport.getContinuation(request, client);
         Listener listener = getListener(request);
+        if (listener!=null && continuation!=null && !continuation.isPending())
+            listener.access();
 
+        Message message = null;
         synchronized (client) {
 
             List consumers = client.getConsumers();
@@ -259,8 +267,6 @@
             // messages
 
             if (message == null) {
-                continuation = ContinuationSupport.getContinuation(request, client);
-
                 // register this continuation with our listener.
                 listener.setContinuation(continuation);
 
@@ -268,6 +274,7 @@
                 // request here).
                 continuation.suspend(timeout);
             }
+            listener.setContinuation(null);
 
             // prepare the responds
             response.setContentType("text/xml");
@@ -299,7 +306,6 @@
 
                 // Look for any available messages
                 message = consumer.receiveNoWait();
-                // System.err.println("received "+message+" from "+consumer);
                 while (message != null && messages < maximumMessages) {
                     String id = (String) consumerIdMap.get(consumer);
                     writer.print("<response id='");
@@ -314,6 +320,7 @@
 
             // Add poll message
             // writer.println("<response type='object' id='amqPoll'><ok/></response>");
+            
             writer.print("</ajax-response>");
 
             writer.flush();
@@ -386,15 +393,18 @@
      */
     private class Listener implements MessageAvailableListener {
         WebClient client;
-
+        long lastAccess;
         Continuation continuation;
 
-        List queue = new LinkedList();
-
         Listener(WebClient client) {
             this.client = client;
         }
 
+        public void access()
+        {
+            lastAccess=System.currentTimeMillis();
+        }
+        
         public void setContinuation(Continuation continuation) {
             synchronized (client) {
                 this.continuation = continuation;
@@ -408,21 +418,13 @@
                 }
                 if (continuation != null)
                     continuation.resume();
+                else if (System.currentTimeMillis()-lastAccess>2*maximumReadTimeout)
+                {
+                    client.closeConsumers();
+                }
                 continuation = null;
             }
         }
 
     }
-
-    private static void dump(Map map)
-    {
-        Iterator iter=map.entrySet().iterator();
-        while(iter.hasNext())
-        {
-            Map.Entry entry=(Map.Entry)iter.next();
-            String k=(String)entry.getKey();
-            String[] v=(String[])entry.getValue();
-            System.err.println(k+":"+(v==null?"[]":Arrays.asList(v).toString()));
-        }
-    }
 }
Index: activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
===================================================================
--- activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java	(revision 388492)
+++ activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java	(working copy)
@@ -38,6 +38,11 @@
  * there are various ways to map JMS operations to web requests
  * so we put most of the common behaviour in a reusable base class.
  *
+ * This servlet can be configured with the following init paramters <dl>
+ * <dt>topic</dt><dd>Set to 'true' if the servle should default to using topics rather than channels</dd>
+ * <dt>destination</dt><dd>The default destination to use if one is not specifiied</dd>
+ * <dt></dt><dd></dd>
+ * </dl>
  * @version $Revision: 1.1.1.1 $
  */
 public abstract class MessageServletSupport extends HttpServlet {
@@ -74,7 +79,7 @@
     }
 
     protected WebClient createWebClient(HttpServletRequest request) {
-        return new WebClient(getServletContext());
+        return new WebClient();
     }
 
     public static boolean asBoolean(String param) {
@@ -99,7 +104,7 @@
     protected WebClient getWebClient(HttpServletRequest request) {
         HttpSession session = request.getSession(true);
         WebClient client = WebClient.getWebClient(session);
-        if (client == null) {
+        if (client == null || client.isClosed()) {
             client = createWebClient(request);
             session.setAttribute(WebClient.webClientAttribute, client);
         }
Index: activemq-web/src/main/java/org/apache/activemq/web/ConnectionManager.java
===================================================================
--- activemq-web/src/main/java/org/apache/activemq/web/ConnectionManager.java	(revision 388492)
+++ activemq-web/src/main/java/org/apache/activemq/web/ConnectionManager.java	(working copy)
@@ -1,49 +0,0 @@
-/**
- *
- * Copyright 2005-2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.activemq.web;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.servlet.http.HttpSessionEvent;
-import javax.servlet.http.HttpSessionListener;
-
-/**
- * Listens to sessions closing to ensure that JMS connections are
- * cleaned up nicely
- *
- * @version $Revision: 1.1.1.1 $
- */
-public class ConnectionManager implements HttpSessionListener {
-    private static final Log log = LogFactory.getLog(ConnectionManager.class);
-
-    public void sessionCreated(HttpSessionEvent event) {
-    }
-
-    public void sessionDestroyed(HttpSessionEvent event) {
-        /** TODO we can't use the session any more now!
-         WebClient client = WebClient.getWebClient(event.getSession());
-         try {
-         client.stop();
-         }
-         catch (JMSException e) {
-         log.warn("Error closing connection: " + e, e);
-         }
-         */
-    }
-}
Index: activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
===================================================================
--- activemq-web/src/main/java/org/apache/activemq/web/WebClient.java	(revision 388492)
+++ activemq-web/src/main/java/org/apache/activemq/web/WebClient.java	(working copy)
@@ -35,43 +35,44 @@
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-import javax.jms.Topic;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpSession;
 import javax.servlet.http.HttpSessionActivationListener;
+import javax.servlet.http.HttpSessionBindingEvent;
+import javax.servlet.http.HttpSessionBindingListener;
 import javax.servlet.http.HttpSessionEvent;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.MessageAvailableConsumer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 import edu.emory.mathcs.backport.java.util.concurrent.Semaphore;
 
 /**
  * Represents a messaging client used from inside a web container
  * typically stored inside a HttpSession
+ * 
+ * TODO controls to prevent DOS attacks with users requesting many consumers 
  *
  * @version $Revision: 1.1.1.1 $
  */
-public class WebClient implements HttpSessionActivationListener, Externalizable {
+public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {
     public static final String webClientAttribute = "org.apache.activemq.webclient";
     public static final String connectionFactoryAttribute = "org.apache.activemq.connectionFactory";
-    public static final String queueConsumersAttribute = "org.apache.activemq.queueConsumers";
     public static final String brokerUrlInitParam = "org.apache.activemq.brokerURL";
 
     private static final Log log = LogFactory.getLog(WebClient.class);
 
     private static transient ConnectionFactory factory;
-    private static transient Map queueConsumers;
+    
+    private transient Map consumers = new HashMap();
 
-    private transient ServletContext context;
     private transient ActiveMQConnection connection;
     private transient ActiveMQSession session;
     private transient MessageProducer producer;
-    private transient Map topicConsumers = new ConcurrentHashMap();
     private int deliveryMode = DeliveryMode.NON_PERSISTENT;
 
     private final Semaphore semaphore = new Semaphore(1);
@@ -86,26 +87,16 @@
 
 
     public static void initContext(ServletContext context) {
-        factory = initConnectionFactory(context);
-        if (factory == null) {
-            log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
-            factory = new ActiveMQConnectionFactory("vm://localhost");
-            context.setAttribute(connectionFactoryAttribute, factory);
-        }
-        queueConsumers = initQueueConsumers(context);
+        initConnectionFactory(context);
     }
 
     /**
-     * Only called by serialization
      */
     public WebClient() {
+        if (factory==null)
+            throw new IllegalStateException("initContext(ServletContext) not called");
     }
 
-    public WebClient(ServletContext context) {
-        this.context = context;
-        initContext(context);
-    }
-
     
     public int getDeliveryMode() {
         return deliveryMode;
@@ -117,28 +108,52 @@
     }
 
 
-    public void start() throws JMSException {
+    public synchronized void closeConsumers() 
+    {
+        for (Iterator it = consumers.values().iterator(); it.hasNext();) {
+            MessageConsumer consumer = (MessageConsumer) it.next();
+            it.remove();
+            try{
+                consumer.setMessageListener(null);
+                if (consumer instanceof MessageAvailableConsumer)
+                    ((MessageAvailableConsumer)consumer).setAvailableListener(null);
+                consumer.close();
+            }
+            catch(JMSException e)
+            {
+                e.printStackTrace();
+            }
+        }
     }
 
-    public void stop() throws JMSException {
-        System.out.println("Closing the WebClient!!! " + this);
-        
+    public synchronized void close() {
         try {
-            connection.close();
+            closeConsumers();
+            if (connection!=null)
+                connection.close();
+        } catch (JMSException e) {
+            throw new RuntimeException(e);
         }
         finally {
             producer = null;
             session = null;
             connection = null;
-            topicConsumers.clear();
+            if (consumers!=null)
+                consumers.clear();
+            consumers=null;
         }
     }
+    
+    public boolean isClosed()
+    {
+        return consumers==null;
+    }
 
     public void writeExternal(ObjectOutput out) throws IOException {
     }
 
     public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        topicConsumers = new HashMap();
+        consumers = new HashMap();
     }
 
     public void send(Destination destination, Message message) throws JMSException {
@@ -167,103 +182,75 @@
         return connection;
     }
 
-    public void sessionWillPassivate(HttpSessionEvent event) {
-        try {
-            stop();
-        }
-        catch (JMSException e) {
-            log.warn("Could not close connection: " + e, e);
-        }
-    }
-
-    public void sessionDidActivate(HttpSessionEvent event) {
-        // lets update the connection factory from the servlet context
-        context = event.getSession().getServletContext();
-        initContext(context);
-    }
-
-    public static Map initQueueConsumers(ServletContext context) {
-        Map answer = (Map) context.getAttribute(queueConsumersAttribute);
-        if (answer == null) {
-            answer = new HashMap();
-            context.setAttribute(queueConsumersAttribute, answer);
-        }
-        return answer;
-    }
-
-
-    public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
-        ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
-        if (connectionFactory == null) {
-            String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
-
+    public static synchronized void initConnectionFactory(ServletContext servletContext) {
+        if (factory==null)
+            factory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
+        if (factory == null) {
+            String brokerURL = servletContext.getInitParameter(brokerUrlInitParam);
+            
             servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
-
+            
             if (brokerURL == null) {
                 brokerURL = "vm://localhost";
             }
+            
+            ActiveMQConnectionFactory amqfactory = new ActiveMQConnectionFactory(brokerURL);
+            factory = amqfactory;
+           
+            servletContext.setAttribute(connectionFactoryAttribute, factory);
+        }
+    }
 
+    public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
+        return getConsumer(destination,true);
+    }
 
-            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
-            connectionFactory = factory;
-            servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
+    public synchronized MessageConsumer getConsumer(Destination destination, boolean create) throws JMSException {
+        
+        MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
+        if (create && consumer == null) {
+            consumer = getSession().createConsumer(destination);
+            consumers.put(destination, consumer);
         }
-        return connectionFactory;
+        return consumer;
     }
 
-    public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
-        if (destination instanceof Topic) {
-            MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination);
-            if (consumer == null) {
-                consumer = getSession().createConsumer(destination);
-                topicConsumers.put(destination, consumer);
-            }
-            return consumer;
+    public synchronized void closeConsumer(Destination destination) throws JMSException {
+        MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
+        if (consumer != null) {
+            consumers.remove(destination);
+            consumer.setMessageListener(null);
+            if (consumer instanceof MessageAvailableConsumer)
+                ((MessageAvailableConsumer)consumer).setAvailableListener(null);
+            consumer.close();
         }
-        else {
-            synchronized (queueConsumers) {
-                SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination);
-                if (pair == null) {
-                    pair = createSessionConsumerPair(destination);
-                    queueConsumers.put(destination, pair);
-                }
-                return pair.consumer;
-            }
-        }
     }
     
     public synchronized List getConsumers()
     {
-        ArrayList list = new ArrayList(topicConsumers.size()+queueConsumers.size());
-        
-        // TODO check this double synchronization on queue but not on topics
-        synchronized (queueConsumers) {
-            for (Iterator it = queueConsumers.values().iterator(); it.hasNext();) {
-                SessionConsumerPair pair = (SessionConsumerPair) it.next();
-                list.add(pair.consumer);
-            }
-        }
-        list.addAll(topicConsumers.values());
-        return list;
+        return new ArrayList(consumers.values());
     }
 
     protected ActiveMQSession createSession() throws JMSException {
         return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
     }
 
-    protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException {
-        SessionConsumerPair answer = new SessionConsumerPair();
-        answer.session = createSession();
-        answer.consumer = answer.session.createConsumer(destination);
-        return answer;
+
+    public Semaphore getSemaphore() {
+        return semaphore;
     }
 
-    protected static class SessionConsumerPair {
-        public Session session;
-        public MessageConsumer consumer;
+    public void sessionWillPassivate(HttpSessionEvent event) {
+        close();
     }
 
-    public Semaphore getSemaphore() {
-        return semaphore;
+    public void sessionDidActivate(HttpSessionEvent event) {
     }
+
+    public void valueBound(HttpSessionBindingEvent event) {
+    }
+
+    public void valueUnbound(HttpSessionBindingEvent event) {
+        close();
+    }
 }
Index: activemq-web/src/main/webapp/WEB-INF/web.xml
===================================================================
--- activemq-web/src/main/webapp/WEB-INF/web.xml	(revision 388492)
+++ activemq-web/src/main/webapp/WEB-INF/web.xml	(working copy)
@@ -38,12 +38,6 @@
         <description>Whether we should include an embedded broker or not</description>
     </context-param>
 
-    <!-- connection manager -->
-    <listener>
-        <listener-class>org.apache.activemq.web.ConnectionManager</listener-class>
-    </listener>
-
-
     <!-- servlet mappings -->
     
     <!-- the subscription REST servlet -->

Reply via email to