Author: ritchiem Date: Fri May 7 15:11:56 2010 New Revision: 942108 URL: http://svn.apache.org/viewvc?rev=942108&view=rev Log: QPID-1447 : Exclude SCD testing until complete
Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java Modified: qpid/trunk/qpid/java/test-profiles/Excludes Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF Fri May 7 15:11:56 2010 @@ -0,0 +1,22 @@ +Manifest-Version: 1.0 +Bundle-ManifestVersion: 2 +Bundle-Name: Qpid Slow Consumer Detection +Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true +Bundle-Version: 1.0.0 +Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator +Import-Package: org.osgi.framework, + org.apache.qpid.server.configuration.plugin, + org.apache.qpid.server.configuration, + org.apache.qpid.server.virtualhost.plugin, + org.apache.qpid.server.virtualhost, + org.apache.qpid.server.queue, + org.apache.qpid.server.registry, + org.apache.qpid.server.plugins, + org.apache.qpid, + org.apache.log4j, + org.apache.commons.configuration +Bundle-RequiredExecutionEnvironment: JavaSE-1.6 +Bundle-ClassPath: . +Bundle-ActivationPolicy: lazy +Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework" + Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml Fri May 7 15:11:56 2010 @@ -0,0 +1,32 @@ +<!-- + - + - Licensed to the Apache Software Foundation (ASF) under one +nn - or more contributor license agreements. See the NOTICE file + -n distributed with this work for additional information + - regarding copyright ownership. The ASF licenses this file + - to you under the Apache License, Version 2.0 (the + - "License"); you may not use this file except in compliance + - with the License. You may obtain a copy of the License at + - + - http://www.apache.org/licenses/LICENSE-2.0 + - + - Unless required by applicable law or agreed to in writing, + - software distributed under the License is distributed on an + - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + - KIND, either express or implied. See the License for the + - specific language governing permissions and limitations + - under the License. + - + --> +<project name="Slow Consumer Disconnect" default="build"> + + <property name="module.depends" value="common broker broker-plugins"/> + <property name="module.test.depends" value="broker/test systests client management/common"/> + <property name="module.manifest" value="MANIFEST.MF"/> + <property name="module.plugin" value="true"/> + + <import file="../../../module.xml"/> + + <target name="bundle" depends="bundle-tasks"/> + +</project> Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java Fri May 7 15:11:56 2010 @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; + +import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory; +import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory; +import org.osgi.framework.BundleActivator; +import org.osgi.framework.BundleContext; + +/** + * Activator that loads our OSGi bundles for the Slow Consumer Detection plugin. + * + * This includes Configuration + * + * @author ritchiem + */ +public class Activator implements BundleActivator +{ + public void start(BundleContext ctx) throws Exception + { + if (null != ctx) + { + ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(), null); + ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), null); + ctx.registerService(ConfigurationPluginFactory.class.getName(), new SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(), null); + ctx.registerService(VirtualHostPluginFactory.class.getName(), new SlowConsumerDetection.SlowConsumerFactory(), null); + ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new TopicDeletePolicy.DeletePolicyFactory(), null); + } + } + + public void stop(BundleContext ctx) throws Exception + { + // no need to do anything here, osgi will unregister the service for us + } + +} Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java Fri May 7 15:11:56 2010 @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.virtualhost.plugin.VirtualHostPlugin; +import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory; + +class SlowConsumerDetection implements VirtualHostPlugin +{ + Logger _logger = Logger.getLogger(SlowConsumerDetection.class); + private VirtualHost _virtualhost; + private SlowConsumerDetectionConfiguration _config; + private SlowConsumerPolicyPlugin _policy; + + public static class SlowConsumerFactory implements VirtualHostPluginFactory + { + public VirtualHostPlugin newInstance(VirtualHost vhost) + { + return new SlowConsumerDetection(vhost); + } + } + + public SlowConsumerDetection(VirtualHost vhost) + { + _virtualhost = vhost; + _config = vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class); + if (_config == null) + { + throw new IllegalArgumentException("Plugin has not been configured"); + } + + } + + public void run() + { + _logger.info("Starting the SlowConsumersDetection job"); + try + { + for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues()) + { + _logger.debug("Checking consumer status for queue: " + + q.getName()); + try + { + SlowConsumerDetectionQueueConfiguration config = + q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class); + + if (checkQueueStatus(q, config)) + { + config.getPolicy().performPolicy(q); + } + } + catch (Exception e) + { + _logger.error("Exception in SlowConsumersDetection " + + "for queue: " + + q.getNameShortString().toString(), e); + //Don't throw exceptions as this will stop the + // house keeping task from running. + } + } + _logger.info("SlowConsumersDetection job completed."); + } + catch (Exception e) + { + _logger.error("SlowConsumersDetection job failed: " + e.getMessage(), e); + } + catch (Error e) + { + _logger.error("SlowConsumersDetection job failed with error: " + e.getMessage(), e); + } + } + + public long getDelay() + { + return _config.getDelay(); + } + + public String getTimeUnit() + { + return _config.getTimeUnit(); + } + + /** + * Check the depth,messageSize,messageAge,messageCount values for this q + * + * @param q the queue to check + * @param config + * + * @return true if the queue has reached a threshold. + */ + private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) + { + + _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); + + return config != null && + (q.getMessageCount() >= config.getMessageCount() || + q.getQueueDepth() >= config.getDepth() || + q.getOldestMessageArrivalTime() >= config.getMessageAge()); + } +} Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java Fri May 7 15:11:56 2010 @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory; + +import java.util.concurrent.TimeUnit; + +public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin +{ + public static class SlowConsumerDetectionConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionConfiguration slowConsumerConfig = new SlowConsumerDetectionConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public String[] getParentPaths() + { + return new String[]{"virtualhosts.virtualhost.slow-consumer-detection"}; + } + } + + public String[] getElementsProcessed() + { + return new String[]{"delay", + "timeunit"}; + } + + public long getDelay() + { + return _configuration.getLong("delay", 10); + } + + public String getTimeUnit() + { + return _configuration.getString("timeunit", TimeUnit.SECONDS.toString()); + } + + @Override + public void setConfiguration(String path, Configuration configuration) throws ConfigurationException + { + super.setConfiguration(path, configuration); + + System.out.println("Configured SCDC"); + System.out.println("Delay:" + getDelay()); + System.out.println("TimeUnit:" + getTimeUnit()); + } +} Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java Fri May 7 15:11:56 2010 @@ -0,0 +1,81 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory; + +import java.util.List; + +public class SlowConsumerDetectionPolicyConfiguration extends ConfigurationPlugin +{ + + public static class SlowConsumerDetectionPolicyConfigurationFactory + implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, + Configuration config) + throws ConfigurationException + { + SlowConsumerDetectionPolicyConfiguration slowConsumerConfig = + new SlowConsumerDetectionPolicyConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public String[] getParentPaths() + { + return new String[]{ + "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", + "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", + "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"}; + } + } + + public String[] getElementsProcessed() + { + // NOTE: the use of '@name]' rather than '[...@name]' this appears to be + // a bug in commons configuration. + //fixme - Simple test case needs raised and JIRA raised on Commons + return new String[]{"@name]", "options"}; + } + + public String getPolicyName() + { + return _configuration.getString("[...@name]"); + } + + public String getOption(String option) + { + List options = _configuration.getList("options.opti...@name]"); + + if (options != null && options.contains(option)) + { + return _configuration.getString("options.opti...@value]" + + "(" + options.indexOf(option) + ")"); + } + + return null; + } +} Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java Fri May 7 15:11:56 2010 @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin; +import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory; +import org.apache.qpid.server.plugins.PluginManager; +import org.apache.qpid.server.registry.ApplicationRegistry; + +import java.util.Map; + +public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin +{ + private SlowConsumerPolicyPlugin _policyPlugin; + + public static class SlowConsumerDetectionQueueConfigurationFactory implements ConfigurationPluginFactory + { + public ConfigurationPlugin newInstance(String path, Configuration config) throws ConfigurationException + { + SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new SlowConsumerDetectionQueueConfiguration(); + slowConsumerConfig.setConfiguration(path, config); + return slowConsumerConfig; + } + + public String[] getParentPaths() + { + return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection", + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", + "virtualhosts.virtualhost.topics.slow-consumer-detection", + "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"}; + } + + } + + public String[] getElementsProcessed() + { + return new String[]{"messageAge", + "depth", + "messageCount"}; + } + + public int getMessageAge() + { + return (int) getConfigurationValue("messageAge"); + } + + public long getDepth() + { + return getConfigurationValue("depth"); + } + + public long getMessageCount() + { + return getConfigurationValue("messageCount"); + } + + public SlowConsumerPolicyPlugin getPolicy() + { + return _policyPlugin; + } + + @Override + public void setConfiguration(String path, Configuration configuration) throws ConfigurationException + { + super.setConfiguration(path, configuration); + + SlowConsumerDetectionPolicyConfiguration policyConfig = getConfiguration(SlowConsumerDetectionPolicyConfiguration.class); + + PluginManager pluginManager = ApplicationRegistry.getInstance().getPluginManager(); + Map<String, SlowConsumerPolicyPluginFactory> factories = + pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); + + if (_logger.isDebugEnabled()) + { + _logger.debug("Configured SCDQC"); + _logger.debug("Age:" + getMessageAge()); + _logger.debug("Depth:" + getDepth()); + _logger.debug("Count:" + getMessageCount()); + _logger.debug("Policy:" + policyConfig.getPolicyName()); + _logger.debug("Available factories:" + factories); + } + + SlowConsumerPolicyPluginFactory pluginFactory = factories.get(policyConfig.getPolicyName().toLowerCase()); + + if (pluginFactory == null) + { + throw new ConfigurationException("Unknown Slow Consumer Policy specified:" + policyConfig.getPolicyName() + " Known Policies:" + factories.keySet()); + } + + _policyPlugin = pluginFactory.newInstance(policyConfig); + } + + private long getConfigurationValue(String property) + { + // The _configuration we are given is a munged configurated + // so the queue will already have queue-queues munging + + // we then need to ensure that the TopicsConfiguration + // and TopicConfiguration classes correctly munge their configuration: + // queue-queues -> topic-topics + + return _configuration.getLong(property, 0); + } + +} Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java Fri May 7 15:11:56 2010 @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; + +import org.apache.qpid.server.queue.AMQQueue; + +public interface SlowConsumerPolicyPlugin +{ + public void performPolicy(AMQQueue Queue); +} Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java Fri May 7 15:11:56 2010 @@ -0,0 +1,29 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; + +import org.apache.qpid.server.plugins.PluginFactory; + +public interface SlowConsumerPolicyPluginFactory extends PluginFactory +{ + + public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration); +} Added: qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java?rev=942108&view=auto ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java (added) +++ qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java Fri May 7 15:11:56 2010 @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost.plugin.SlowConsumer; + +import org.apache.log4j.Logger; +import org.apache.qpid.AMQException; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.TopicExchange; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.queue.AMQQueue; + +public class TopicDeletePolicy implements SlowConsumerPolicyPlugin +{ + Logger _logger = Logger.getLogger(TopicDeletePolicy.class); + private SlowConsumerDetectionPolicyConfiguration _configuration; + + public static class DeletePolicyFactory implements SlowConsumerPolicyPluginFactory + { + + public SlowConsumerPolicyPlugin newInstance(SlowConsumerDetectionPolicyConfiguration configuration) + { + return new TopicDeletePolicy(configuration); + } + + public String getPluginName() + { + return "topicdelete"; + } + } + + public TopicDeletePolicy(SlowConsumerDetectionPolicyConfiguration config) + { + _configuration = config; + } + + public void performPolicy(AMQQueue q) + { + AMQSessionModel owner = q.getExclusiveOwningSession(); + + // Only process exclusive queues + if (owner == null) + { + return; + } + + //Only process Topics + if(!validateQueueIsATopic(q)) + { + return; + } + + try + { + owner.getConnectionModel(). + closeSession(owner, AMQConstant.RESOURCE_ERROR, + "Consuming to slow."); + + String option = _configuration.getOption("delete-persistent"); + + boolean deletePersistent = option != null && Boolean.parseBoolean(option); + + if (!q.isAutoDelete() && deletePersistent) + { + q.delete(); + } + + } + catch (AMQException e) + { + _logger.warn("Unable to close consumer:" + owner + ", on queue:" + q.getName()); + } + + } + + /** + * Check the queue bindings to validate the queue is bound to the + * topic exchange. + * + * @param q the Queue + * @return true iff Q is bound to a TopicExchange + */ + private boolean validateQueueIsATopic(AMQQueue q) + { + for (Binding binding : q.getBindings()) + { + if (binding.getExchange() instanceof TopicExchange) + { + return true; + } + } + + return false; + } +} Modified: qpid/trunk/qpid/java/test-profiles/Excludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=942108&r1=942107&r2=942108&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/Excludes (original) +++ qpid/trunk/qpid/java/test-profiles/Excludes Fri May 7 15:11:56 2010 @@ -31,3 +31,6 @@ org.apache.qpid.test.unit.ack.Acknowledg // QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail. org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart + +// QPID-1447 : Work In Progress +org.apache.qpid.systest.SlowConsumerTest#* --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org