Author: aidan Date: Wed Feb 11 15:17:19 2009 New Revision: 743357 URL: http://svn.apache.org/viewvc?rev=743357&view=rev Log: QPID-430: Fix message age alerting so that it works on queues which are otherwise inactive.
AMQQueue, VirtualHost, MockAMQQueue: change name of removeExpiredIfNoSubscribers to checkMessageStatus. AMQQueueMBean: remove unthrown exception SimpleAMQQueue: add notification checks to checkMessageStatus, remove catch for JMException which checkForNotification no longer throws. NullApplicationRegistry: set small housekeeping check period so that it runs freuqently and tests don't need to sleep for excessive periods of time AMQQueueAlertTest: remove subsequent send, notification alerts shouldn't depend on queue activity. Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=743357&r1=743356&r2=743357&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Wed Feb 11 15:17:19 2009 @@ -149,9 +149,11 @@ long clearQueue(StoreContext storeContext) throws AMQException; - - - void removeExpiredIfNoSubscribers() throws AMQException; + /** + * Checks the status of messages on the queue, purging expired ones, firing age related alerts etc. + * @throws AMQException + */ + void checkMessageStatus() throws AMQException; Set<NotificationCheck> getNotificationChecks(); Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=743357&r1=743356&r2=743357&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Wed Feb 11 15:17:19 2009 @@ -246,7 +246,7 @@ /** * Checks if there is any notification to be send to the listeners */ - public void checkForNotification(AMQMessage msg) throws AMQException, JMException + public void checkForNotification(AMQMessage msg) throws AMQException { final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=743357&r1=743356&r2=743357&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed Feb 11 15:17:19 2009 @@ -423,17 +423,9 @@ deliverAsync(); } - try - { - _managedObject.checkForNotification(entry.getMessage()); - } - catch (JMException e) - { - throw new AMQException("Unable to get notification from manage queue: " + e, e); - } - + _managedObject.checkForNotification(entry.getMessage()); + return entry; - } private void deliverToSubscription(final Subscription sub, final QueueEntry entry) @@ -1431,7 +1423,8 @@ } } - public void removeExpiredIfNoSubscribers() throws AMQException + @Override + public void checkMessageStatus() throws AMQException { final StoreContext storeContext = new StoreContext(); @@ -1443,10 +1436,12 @@ QueueEntry node = queueListIterator.getNode(); if (!node.isDeleted() && node.expired() && node.acquire()) { - node.discard(storeContext); + } + else + { + _managedObject.checkForNotification(node.getMessage()); } - } } Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?rev=743357&r1=743356&r2=743357&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Wed Feb 11 15:17:19 2009 @@ -49,7 +49,8 @@ _logger.info("Initialising NullApplicationRegistry"); _configuration.addProperty("store.class", "org.apache.qpid.server.store.MemoryMessageStore"); - + _configuration.addProperty("housekeeping.expiredMessageCheckPeriod", "200"); + Properties users = new Properties(); users.put("guest", "guest"); Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=743357&r1=743356&r2=743357&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed Feb 11 15:17:19 2009 @@ -208,7 +208,7 @@ try { - q.removeExpiredIfNoSubscribers(); + q.checkMessageStatus(); } catch (AMQException e) { Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java?rev=743357&r1=743356&r2=743357&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java Wed Feb 11 15:17:19 2009 @@ -167,9 +167,6 @@ // Ensure message sits on queue long enough to age. Thread.sleep(MAX_MESSAGE_AGE * 2); - sendMessages(1, MAX_MESSAGE_SIZE); - assertTrue(_queueMBean.getMessageCount() == 2); - Notification lastNotification = _queueMBean.getLastNotification(); assertNotNull(lastNotification); Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=743357&r1=743356&r2=743357&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original) +++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Wed Feb 11 15:17:19 2009 @@ -277,7 +277,8 @@ return 0; //To change body of implemented methods use File | Settings | File Templates. } - public void removeExpiredIfNoSubscribers() throws AMQException + @Override + public void checkMessageStatus() throws AMQException { //To change body of implemented methods use File | Settings | File Templates. } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org