Author: kwall Date: Wed Nov 30 13:57:47 2011 New Revision: 1208434 URL: http://svn.apache.org/viewvc?rev=1208434&view=rev Log: QPID-3601: Occasional failure to delete queue on broker shutdown
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java?rev=1208434&r1=1208433&r2=1208434&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/SlowConsumerDetectionQueueConfiguration.java Wed Nov 30 13:57:47 2011 @@ -87,6 +87,19 @@ public class SlowConsumerDetectionQueueC @Override public void validateConfiguration() throws ConfigurationException { + PluginManager pluginManager; + try + { + pluginManager = ApplicationRegistry.getInstance().getPluginManager(); + } + catch (IllegalStateException ise) + { + // We see this happen during shutdown due to asynchronous reconfig performed IO threads + // running at the same time as the shutdown handler. + _policyPlugin = null; + return; + } + if (!containsPositiveLong("messageAge") && !containsPositiveLong("depth") && !containsPositiveLong("messageCount")) @@ -96,8 +109,6 @@ public class SlowConsumerDetectionQueueC } SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class.getName()); - - PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getSlowConsumerPlugins(); if (policyConfig == null) Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java?rev=1208434&r1=1208433&r2=1208434&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java Wed Nov 30 13:57:47 2011 @@ -26,11 +26,13 @@ import org.apache.qpid.framing.AMQShortS import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionConfiguration; import org.apache.qpid.server.configuration.plugins.SlowConsumerDetectionQueueConfiguration; import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin; +import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.plugins.Plugin; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.plugins.logging.SlowConsumerDetectionMessages; +import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; public class SlowConsumerDetection extends VirtualHostHouseKeepingPlugin { @@ -56,7 +58,7 @@ public class SlowConsumerDetection exten /** * Configures the slow consumer disconnect plugin by adding a listener to each exchange on this - * cirtual host to record all the configured queues in a cache for processing by the housekeeping + * virtual host to record all the configured queues in a cache for processing by the housekeeping * thread. * * @see Plugin#configure(ConfigurationPlugin) @@ -65,9 +67,10 @@ public class SlowConsumerDetection exten { _config = (SlowConsumerDetectionConfiguration) config; _listener = new ConfiguredQueueBindingListener(getVirtualHost().getName()); - for (AMQShortString exchangeName : getVirtualHost().getExchangeRegistry().getExchangeNames()) + final ExchangeRegistry exchangeRegistry = getVirtualHost().getExchangeRegistry(); + for (AMQShortString exchangeName : exchangeRegistry.getExchangeNames()) { - getVirtualHost().getExchangeRegistry().getExchange(exchangeName).addBindingListener(_listener); + exchangeRegistry.getExchange(exchangeName).addBindingListener(_listener); } } @@ -87,11 +90,21 @@ public class SlowConsumerDetection exten try { - SlowConsumerDetectionQueueConfiguration config = + final SlowConsumerDetectionQueueConfiguration config = q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class.getName()); if (checkQueueStatus(q, config)) { - config.getPolicy().performPolicy(q); + final SlowConsumerPolicyPlugin policy = config.getPolicy(); + if (policy == null) + { + // We would only expect to see this during shutdown + _logger.warn("No slow consumer policy for queue " + q.getName()); + } + else + { + policy.performPolicy(q); + } + } } catch (Exception e) @@ -126,7 +139,10 @@ public class SlowConsumerDetection exten { if (config != null) { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + if (_logger.isInfoEnabled()) + { + _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + } int count = q.getMessageCount(); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org