Author: ritchiem
Date: Fri May  7 15:11:56 2010
New Revision: 942108

URL: http://svn.apache.org/viewvc?rev=942108&view=rev
Log:
QPID-1447 : Exclude SCD testing until complete

Added:
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
Modified:
    qpid/trunk/qpid/java/test-profiles/Excludes

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/MANIFEST.MF
 Fri May  7 15:11:56 2010
@@ -0,0 +1,22 @@
+Manifest-Version: 1.0
+Bundle-ManifestVersion: 2
+Bundle-Name: Qpid Slow Consumer Detection
+Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true
+Bundle-Version: 1.0.0
+Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator
+Import-Package: org.osgi.framework,
+ org.apache.qpid.server.configuration.plugin,
+ org.apache.qpid.server.configuration,
+ org.apache.qpid.server.virtualhost.plugin,
+ org.apache.qpid.server.virtualhost,
+ org.apache.qpid.server.queue,
+ org.apache.qpid.server.registry,
+ org.apache.qpid.server.plugins,
+ org.apache.qpid,
+ org.apache.log4j,
+ org.apache.commons.configuration 
+Bundle-RequiredExecutionEnvironment: JavaSE-1.6
+Bundle-ClassPath: .
+Bundle-ActivationPolicy: lazy
+Export-Package: 
org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework"
+

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/build.xml
 Fri May  7 15:11:56 2010
@@ -0,0 +1,32 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+nn - or more contributor license agreements.  See the NOTICE file
+ -n distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="Slow Consumer Disconnect" default="build">
+
+    <property name="module.depends" value="common broker broker-plugins"/>
+    <property name="module.test.depends" value="broker/test systests client 
management/common"/>
+    <property name="module.manifest" value="MANIFEST.MF"/>
+    <property name="module.plugin" value="true"/>
+
+    <import file="../../../module.xml"/>
+
+    <target name="bundle" depends="bundle-tasks"/>
+
+</project>

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/Activator.java
 Fri May  7 15:11:56 2010
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory;
+import org.osgi.framework.BundleActivator;
+import org.osgi.framework.BundleContext;
+
+/**
+ * Activator that loads our OSGi bundles for the Slow Consumer Detection 
plugin.
+ *
+ * This includes Configuration 
+ *
+ * @author ritchiem
+ */
+public class Activator implements BundleActivator
+{
+    public void start(BundleContext ctx) throws Exception
+    {
+        if (null != ctx)
+        {
+            ctx.registerService(ConfigurationPluginFactory.class.getName(), 
new 
SlowConsumerDetectionQueueConfiguration.SlowConsumerDetectionQueueConfigurationFactory(),
 null);
+            ctx.registerService(ConfigurationPluginFactory.class.getName(), 
new 
SlowConsumerDetectionConfiguration.SlowConsumerDetectionConfigurationFactory(), 
null);
+            ctx.registerService(ConfigurationPluginFactory.class.getName(), 
new 
SlowConsumerDetectionPolicyConfiguration.SlowConsumerDetectionPolicyConfigurationFactory(),
 null);
+            ctx.registerService(VirtualHostPluginFactory.class.getName(), new 
SlowConsumerDetection.SlowConsumerFactory(), null);
+            
ctx.registerService(SlowConsumerPolicyPluginFactory.class.getName(), new 
TopicDeletePolicy.DeletePolicyFactory(), null);
+        }
+    }
+
+    public void stop(BundleContext ctx) throws Exception
+    {
+        // no need to do anything here, osgi will unregister the service for us
+    }
+
+}

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetection.java
 Fri May  7 15:11:56 2010
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPlugin;
+import org.apache.qpid.server.virtualhost.plugin.VirtualHostPluginFactory;
+
+class SlowConsumerDetection implements VirtualHostPlugin
+{
+    Logger _logger = Logger.getLogger(SlowConsumerDetection.class);
+    private VirtualHost _virtualhost;
+    private SlowConsumerDetectionConfiguration _config;
+    private SlowConsumerPolicyPlugin _policy;
+
+    public static class SlowConsumerFactory implements VirtualHostPluginFactory
+    {
+        public VirtualHostPlugin newInstance(VirtualHost vhost)
+        {
+            return new SlowConsumerDetection(vhost);
+        }
+    }
+
+    public SlowConsumerDetection(VirtualHost vhost)
+    {
+        _virtualhost = vhost;
+        _config = 
vhost.getConfiguration().getConfiguration(SlowConsumerDetectionConfiguration.class);
+        if (_config == null)
+        {
+            throw new IllegalArgumentException("Plugin has not been 
configured");
+        }
+
+    }
+
+    public void run()
+    {
+        _logger.info("Starting the SlowConsumersDetection job");
+        try
+        {
+            for (AMQQueue q : _virtualhost.getQueueRegistry().getQueues())
+            {
+                _logger.debug("Checking consumer status for queue: "
+                              + q.getName());
+                try
+                {
+                    SlowConsumerDetectionQueueConfiguration config =
+                            
q.getConfiguration().getConfiguration(SlowConsumerDetectionQueueConfiguration.class);
+
+                    if (checkQueueStatus(q, config))
+                    {
+                        config.getPolicy().performPolicy(q);
+                    }
+                }
+                catch (Exception e)
+                {
+                    _logger.error("Exception in SlowConsumersDetection " +
+                                  "for queue: " +
+                                  q.getNameShortString().toString(), e);
+                    //Don't throw exceptions as this will stop the
+                    // house keeping task from running.
+                }
+            }
+            _logger.info("SlowConsumersDetection job completed.");
+        }
+        catch (Exception e)
+        {
+            _logger.error("SlowConsumersDetection job failed: " + 
e.getMessage(), e);
+        }
+        catch (Error e)
+        {
+            _logger.error("SlowConsumersDetection job failed with error: " + 
e.getMessage(), e);
+        }
+    }
+
+    public long getDelay()
+    {
+        return _config.getDelay();
+    }
+
+    public String getTimeUnit()
+    {
+        return _config.getTimeUnit();
+    }
+
+    /**
+     * Check the depth,messageSize,messageAge,messageCount values for this q
+     *
+     * @param q      the queue to check
+     * @param config
+     *
+     * @return true if the queue has reached a threshold.
+     */
+    private boolean checkQueueStatus(AMQQueue q, 
SlowConsumerDetectionQueueConfiguration config)
+    {
+
+        _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
+
+        return config != null &&
+               (q.getMessageCount() >= config.getMessageCount() ||
+                q.getQueueDepth() >= config.getDepth() ||
+                q.getOldestMessageArrivalTime() >= config.getMessageAge());
+    }
+}

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionConfiguration.java
 Fri May  7 15:11:56 2010
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class SlowConsumerDetectionConfiguration extends ConfigurationPlugin
+{
+    public static class SlowConsumerDetectionConfigurationFactory implements 
ConfigurationPluginFactory
+    {
+        public ConfigurationPlugin newInstance(String path, Configuration 
config) throws ConfigurationException
+        {
+            SlowConsumerDetectionConfiguration slowConsumerConfig = new 
SlowConsumerDetectionConfiguration();
+            slowConsumerConfig.setConfiguration(path, config);
+            return slowConsumerConfig;
+        }
+
+        public String[] getParentPaths()
+        {
+            return new 
String[]{"virtualhosts.virtualhost.slow-consumer-detection"};
+        }
+    }
+
+    public String[] getElementsProcessed()
+    {
+        return new String[]{"delay",
+                            "timeunit"};
+    }
+
+    public long getDelay()
+    {
+        return _configuration.getLong("delay", 10);
+    }
+
+    public String getTimeUnit()
+    {
+        return _configuration.getString("timeunit", 
TimeUnit.SECONDS.toString());
+    }
+
+    @Override
+    public void setConfiguration(String path, Configuration configuration) 
throws ConfigurationException
+    {
+        super.setConfiguration(path, configuration);
+
+        System.out.println("Configured SCDC");
+        System.out.println("Delay:" + getDelay());
+        System.out.println("TimeUnit:" + getTimeUnit());
+    }
+}

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionPolicyConfiguration.java
 Fri May  7 15:11:56 2010
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+
+import java.util.List;
+
+public class SlowConsumerDetectionPolicyConfiguration extends 
ConfigurationPlugin
+{
+
+    public static class SlowConsumerDetectionPolicyConfigurationFactory
+            implements ConfigurationPluginFactory
+    {
+        public ConfigurationPlugin newInstance(String path,
+                                               Configuration config)
+                throws ConfigurationException
+        {
+            SlowConsumerDetectionPolicyConfiguration slowConsumerConfig =
+                    new SlowConsumerDetectionPolicyConfiguration();
+            slowConsumerConfig.setConfiguration(path, config);
+            return slowConsumerConfig;
+        }
+
+        public String[] getParentPaths()
+        {
+            return new String[]{
+                    
"virtualhosts.virtualhost.queues.slow-consumer-detection.policy",
+                    
"virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy",
+                    
"virtualhosts.virtualhost.topics.slow-consumer-detection.policy",
+                    
"virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"};
+        }
+    }
+
+    public String[] getElementsProcessed()
+    {
+        // NOTE: the use of '@name]' rather than '[...@name]' this appears to 
be
+        // a bug in commons configuration.
+        //fixme - Simple test case needs raised and JIRA raised on Commons
+        return new String[]{"@name]", "options"};
+    }
+
+    public String getPolicyName()
+    {
+        return _configuration.getString("[...@name]");
+    }
+
+    public String getOption(String option)
+    {
+        List options = _configuration.getList("options.opti...@name]");
+
+        if (options != null && options.contains(option))
+        {
+            return _configuration.getString("options.opti...@value]" +
+                                            "(" + options.indexOf(option) + 
")");
+        }
+
+        return null;
+    }
+}

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerDetectionQueueConfiguration.java
 Fri May  7 15:11:56 2010
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugin.ConfigurationPluginFactory;
+import org.apache.qpid.server.plugins.PluginManager;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import java.util.Map;
+
+public class SlowConsumerDetectionQueueConfiguration extends 
ConfigurationPlugin
+{
+    private SlowConsumerPolicyPlugin _policyPlugin;
+
+    public static class SlowConsumerDetectionQueueConfigurationFactory 
implements ConfigurationPluginFactory
+    {
+        public ConfigurationPlugin newInstance(String path, Configuration 
config) throws ConfigurationException
+        {
+            SlowConsumerDetectionQueueConfiguration slowConsumerConfig = new 
SlowConsumerDetectionQueueConfiguration();
+            slowConsumerConfig.setConfiguration(path, config);
+            return slowConsumerConfig;
+        }
+
+        public String[] getParentPaths()
+        {
+            return new 
String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection",
+                                
"virtualhosts.virtualhost.queues.queue.slow-consumer-detection",
+                                
"virtualhosts.virtualhost.topics.slow-consumer-detection",
+                                
"virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"};
+        }
+
+    }
+
+    public String[] getElementsProcessed()
+    {
+        return new String[]{"messageAge",
+                            "depth",
+                            "messageCount"};
+    }
+
+    public int getMessageAge()
+    {
+        return (int) getConfigurationValue("messageAge");
+    }
+
+    public long getDepth()
+    {
+        return getConfigurationValue("depth");
+    }
+
+    public long getMessageCount()
+    {
+        return getConfigurationValue("messageCount");
+    }
+
+    public SlowConsumerPolicyPlugin getPolicy()
+    {
+        return _policyPlugin;
+    }
+
+    @Override
+    public void setConfiguration(String path, Configuration configuration) 
throws ConfigurationException
+    {
+        super.setConfiguration(path, configuration);
+
+        SlowConsumerDetectionPolicyConfiguration policyConfig = 
getConfiguration(SlowConsumerDetectionPolicyConfiguration.class);
+
+        PluginManager pluginManager = 
ApplicationRegistry.getInstance().getPluginManager();
+        Map<String, SlowConsumerPolicyPluginFactory> factories =
+                
pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Configured SCDQC");
+            _logger.debug("Age:" + getMessageAge());
+            _logger.debug("Depth:" + getDepth());
+            _logger.debug("Count:" + getMessageCount());
+            _logger.debug("Policy:" + policyConfig.getPolicyName());
+            _logger.debug("Available factories:" + factories);
+        }
+
+        SlowConsumerPolicyPluginFactory pluginFactory = 
factories.get(policyConfig.getPolicyName().toLowerCase());
+
+        if (pluginFactory == null)
+        {
+            throw new ConfigurationException("Unknown Slow Consumer Policy 
specified:" + policyConfig.getPolicyName() + " Known Policies:" + 
factories.keySet());
+        }
+
+        _policyPlugin = pluginFactory.newInstance(policyConfig);
+    }
+
+    private long getConfigurationValue(String property)
+    {
+        // The _configuration we are given is a munged configurated
+        // so the queue will already have queue-queues munging
+
+        // we then need to ensure that the TopicsConfiguration
+        // and TopicConfiguration classes correctly munge their configuration:
+        // queue-queues -> topic-topics
+
+        return _configuration.getLong(property, 0);
+    }
+
+}

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPlugin.java
 Fri May  7 15:11:56 2010
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+public interface SlowConsumerPolicyPlugin
+{
+    public void performPolicy(AMQQueue Queue);
+}

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/SlowConsumerPolicyPluginFactory.java
 Fri May  7 15:11:56 2010
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.qpid.server.plugins.PluginFactory;
+
+public interface SlowConsumerPolicyPluginFactory extends PluginFactory
+{
+
+    public SlowConsumerPolicyPlugin 
newInstance(SlowConsumerDetectionPolicyConfiguration configuration);
+}

Added: 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java?rev=942108&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
 (added)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/SlowConsumerDisconnect/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumer/TopicDeletePolicy.java
 Fri May  7 15:11:56 2010
@@ -0,0 +1,113 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhost.plugin.SlowConsumer;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class TopicDeletePolicy implements SlowConsumerPolicyPlugin
+{
+    Logger _logger = Logger.getLogger(TopicDeletePolicy.class);
+    private SlowConsumerDetectionPolicyConfiguration _configuration;
+
+    public static class DeletePolicyFactory implements 
SlowConsumerPolicyPluginFactory
+    {
+
+        public SlowConsumerPolicyPlugin 
newInstance(SlowConsumerDetectionPolicyConfiguration configuration)
+        {
+            return new TopicDeletePolicy(configuration);
+        }
+
+        public String getPluginName()
+        {
+            return "topicdelete";
+        }
+    }
+
+    public TopicDeletePolicy(SlowConsumerDetectionPolicyConfiguration config)
+    {
+        _configuration = config;
+    }
+
+    public void performPolicy(AMQQueue q)
+    {
+        AMQSessionModel owner = q.getExclusiveOwningSession();
+
+        // Only process exclusive queues
+        if (owner == null)
+        {
+            return;
+        }
+
+        //Only process Topics
+        if(!validateQueueIsATopic(q))
+        {
+            return;
+        }
+
+        try
+        {
+            owner.getConnectionModel().
+                    closeSession(owner, AMQConstant.RESOURCE_ERROR,
+                                 "Consuming to slow.");
+
+            String option = _configuration.getOption("delete-persistent");
+
+            boolean deletePersistent = option != null && 
Boolean.parseBoolean(option);
+
+            if (!q.isAutoDelete() && deletePersistent)
+            {
+                q.delete();
+            }
+
+        }
+        catch (AMQException e)
+        {
+            _logger.warn("Unable to close consumer:" + owner + ", on queue:" + 
q.getName());
+        }
+
+    }
+
+    /**
+     * Check the queue bindings to validate the queue is bound to the
+     * topic exchange.
+     *
+     * @param q the Queue
+     * @return true iff Q is bound to a TopicExchange
+     */
+    private boolean validateQueueIsATopic(AMQQueue q)
+    {
+        for (Binding binding : q.getBindings())
+        {
+            if (binding.getExchange() instanceof TopicExchange)
+            {
+                return true;
+            }
+        }
+
+        return false;
+    }
+}

Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=942108&r1=942107&r2=942108&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Fri May  7 15:11:56 2010
@@ -31,3 +31,6 @@ org.apache.qpid.test.unit.ack.Acknowledg
 
 // QPID-2418 : The queue backing the dur sub is not currently deleted at 
subscription change, so the test will fail.
 
org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
+
+// QPID-1447 : Work In Progress
+org.apache.qpid.systest.SlowConsumerTest#*



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to