Author: aidan
Date: Wed Feb 11 15:17:19 2009
New Revision: 743357

URL: http://svn.apache.org/viewvc?rev=743357&view=rev
Log:
QPID-430: Fix message age alerting so that it works on queues which are 
otherwise inactive.

AMQQueue, VirtualHost, MockAMQQueue: change name of 
removeExpiredIfNoSubscribers to checkMessageStatus.
AMQQueueMBean: remove unthrown exception
SimpleAMQQueue: add notification checks to checkMessageStatus, remove catch for 
JMException which checkForNotification no longer throws.
NullApplicationRegistry: set small housekeeping check period so that it runs 
freuqently and tests don't need to sleep for excessive periods of time
AMQQueueAlertTest: remove subsequent send, notification alerts shouldn't depend 
on queue activity.

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
    
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=743357&r1=743356&r2=743357&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
 Wed Feb 11 15:17:19 2009
@@ -149,9 +149,11 @@
 
     long clearQueue(StoreContext storeContext) throws AMQException;
 
-
-
-    void removeExpiredIfNoSubscribers() throws AMQException;
+    /**
+     * Checks the status of messages on the queue, purging expired ones, 
firing age related alerts etc.
+     * @throws AMQException
+     */
+    void checkMessageStatus() throws AMQException;
 
     Set<NotificationCheck> getNotificationChecks();
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=743357&r1=743356&r2=743357&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
 Wed Feb 11 15:17:19 2009
@@ -246,7 +246,7 @@
     /**
      * Checks if there is any notification to be send to the listeners
      */
-    public void checkForNotification(AMQMessage msg) throws AMQException, 
JMException
+    public void checkForNotification(AMQMessage msg) throws AMQException
     {
 
         final Set<NotificationCheck> notificationChecks = 
_queue.getNotificationChecks();

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=743357&r1=743356&r2=743357&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Wed Feb 11 15:17:19 2009
@@ -423,17 +423,9 @@
             deliverAsync();
         }
 
-        try
-        {
-            _managedObject.checkForNotification(entry.getMessage());
-        }
-        catch (JMException e)
-        {
-            throw new AMQException("Unable to get notification from manage 
queue: " + e, e);
-        }
-
+        _managedObject.checkForNotification(entry.getMessage());
+        
         return entry;
-
     }
 
     private void deliverToSubscription(final Subscription sub, final 
QueueEntry entry)
@@ -1431,7 +1423,8 @@
         }
     }
 
-    public void removeExpiredIfNoSubscribers() throws AMQException
+    @Override
+    public void checkMessageStatus() throws AMQException
     {
 
         final StoreContext storeContext = new StoreContext();
@@ -1443,10 +1436,12 @@
             QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && node.expired() && node.acquire())
             {
-
                 node.discard(storeContext);
+            } 
+            else 
+            {
+                _managedObject.checkForNotification(node.getMessage());
             }
-
         }
 
     }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=743357&r1=743356&r2=743357&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
 Wed Feb 11 15:17:19 2009
@@ -49,7 +49,8 @@
         _logger.info("Initialising NullApplicationRegistry");
         
         _configuration.addProperty("store.class", 
"org.apache.qpid.server.store.MemoryMessageStore");
-
+        _configuration.addProperty("housekeeping.expiredMessageCheckPeriod", 
"200");
+        
         Properties users = new Properties();
 
         users.put("guest", "guest");

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=743357&r1=743356&r2=743357&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
 Wed Feb 11 15:17:19 2009
@@ -208,7 +208,7 @@
 
                         try
                         {
-                            q.removeExpiredIfNoSubscribers();
+                            q.checkMessageStatus();
                         }
                         catch (AMQException e)
                         {

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=743357&r1=743356&r2=743357&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
 Wed Feb 11 15:17:19 2009
@@ -167,9 +167,6 @@
         // Ensure message sits on queue long enough to age.
         Thread.sleep(MAX_MESSAGE_AGE * 2);
 
-        sendMessages(1, MAX_MESSAGE_SIZE);
-        assertTrue(_queueMBean.getMessageCount() == 2);
-
         Notification lastNotification = _queueMBean.getLastNotification();
         assertNotNull(lastNotification);
 

Modified: 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=743357&r1=743356&r2=743357&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
 Wed Feb 11 15:17:19 2009
@@ -277,7 +277,8 @@
         return 0;  //To change body of implemented methods use File | Settings 
| File Templates.
     }
 
-    public void removeExpiredIfNoSubscribers() throws AMQException
+    @Override
+    public void checkMessageStatus() throws AMQException
     {
         //To change body of implemented methods use File | Settings | File 
Templates.
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to