Author: kwall
Date: Wed Nov 30 13:57:47 2011
New Revision: 1208434

URL: http://svn.apache.org/viewvc?rev=1208434&view=rev
Log:
QPID-3601: Occasional failure to delete queue on broker shutdown

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java?rev=1208434&r1=1208433&r2=1208434&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java
 Wed Nov 30 13:57:47 2011
@@ -87,6 +87,19 @@ public class SlowConsumerDetectionQueueC
     @Override
     public void validateConfiguration() throws ConfigurationException
     {
+        PluginManager pluginManager;
+        try
+        {
+            pluginManager = 
ApplicationRegistry.getInstance().getPluginManager();
+        }
+        catch (IllegalStateException ise)
+        {
+            // We see this happen during shutdown due to asynchronous reconfig 
performed IO threads
+            // running at the same time as the shutdown handler.
+            _policyPlugin = null;
+            return;
+        }
+
         if (!containsPositiveLong("messageAge") &&
             !containsPositiveLong("depth") &&
             !containsPositiveLong("messageCount"))
@@ -96,8 +109,6 @@ public class SlowConsumerDetectionQueueC
         }
 
         SlowConsumerDetectionPolicyConfiguration policyConfig = 
getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName());
-
-        PluginManager pluginManager = 
ApplicationRegistry.getInstance().getPluginManager();
         Map<String, SlowConsumerPolicyPluginFactory> factories = 
pluginManager.getSlowConsumerPlugins();
 
         if (policyConfig == null)

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java?rev=1208434&r1=1208433&r2=1208434&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
 Wed Nov 30 13:57:47 2011
@@ -26,11 +26,13 @@ import org.apache.qpid.framing.AMQShortS
 import 
org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration;
 import 
org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration;
 import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import 
org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages;
+import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
 
 public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin
 {
@@ -56,7 +58,7 @@ public class SlowConsumerDetection exten
 
     /**
      * Configures the slow consumer disconnect plugin by adding a listener to 
each exchange on this
-     * cirtual host to record all the configured queues in a cache for 
processing by the housekeeping
+     * virtual host to record all the configured queues in a cache for 
processing by the housekeeping
      * thread.
      * 
      * @see Plugin#configure(ConfigurationPlugin)
@@ -65,9 +67,10 @@ public class SlowConsumerDetection exten
     {        
         _config = (SlowConsumerDetectionConfiguration) config;
         _listener = new 
ConfiguredQueueBindingListener(getVirtualHost().getName());
-        for (AMQShortString exchangeName : 
getVirtualHost().getExchangeRegistry().getExchangeNames())
+        final ExchangeRegistry exchangeRegistry = 
getVirtualHost().getExchangeRegistry();
+        for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames())
         {
-            
getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener);
+            
exchangeRegistry.getExchange(exchangeName).addBindingListener(_listener);
         }
     }
     
@@ -87,11 +90,21 @@ public class SlowConsumerDetection exten
             
             try
             {
-                SlowConsumerDetectionQueueConfiguration config =
+                final SlowConsumerDetectionQueueConfiguration config =
                     
q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName());
                 if (checkQueueStatus(q, config))
                 {
-                    config.getPolicy().performPolicy(q);
+                    final SlowConsumerPolicyPlugin policy = config.getPolicy();
+                    if (policy == null)
+                    {
+                        // We would only expect to see this during shutdown
+                        _logger.warn("No slow consumer policy for queue " + 
q.getName());
+                    }
+                    else
+                    {
+                        policy.performPolicy(q);
+                    }
+
                 }
             }
             catch (Exception e)
@@ -126,7 +139,10 @@ public class SlowConsumerDetection exten
     {
         if (config != null)
         {
-            _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + 
config);
+            if (_logger.isInfoEnabled())
+            {
+                _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + 
config);
+            }
 
             int count = q.getMessageCount();
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to