Author: ritchiem
Date: Tue May 18 14:44:43 2010
New Revision: 945683

URL: http://svn.apache.org/viewvc?rev=945683&view=rev
Log:
QPID-1447 : Update Plugins to use changes to ConfigurationPlugin, Update Test 
to correctly run and prevent failover.
Update excludes to include test in Java Broker runs but not CPP or 010.

Modified:
    
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
    
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
    
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes
    qpid/trunk/qpid/java/test-profiles/Excludes

Modified: 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF?rev=945683&r1=945682&r2=945683&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF
 Tue May 18 14:44:43 2010
@@ -3,20 +3,26 @@ Bundle-ManifestVersion: 2
 Bundle-Name: Qpid Slow Consumer Detection
 Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true
 Bundle-Version: 1.0.0
-Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator
+Bundle-Activator: org.apache.qpid.server.virtualhost.plugin.Activator
 Import-Package: org.osgi.framework,
  org.apache.qpid.server.configuration.plugins,
  org.apache.qpid.server.configuration,
  org.apache.qpid.server.virtualhost.plugins,
  org.apache.qpid.server.virtualhost,
  org.apache.qpid.server.queue,
+ org.apache.qpid.server.binding,
+ org.apache.qpid.server.exchange,
  org.apache.qpid.server.registry,
  org.apache.qpid.server.plugins,
+ org.apache.qpid.server.protocol,
+ org.apache.qpid.protocol,
+ org.apache.qpid.framing,
  org.apache.qpid,
  org.apache.log4j,
  org.apache.commons.configuration 
 Bundle-RequiredExecutionEnvironment: JavaSE-1.6
 Bundle-ClassPath: .
 Bundle-ActivationPolicy: lazy
-Export-Package: 
org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework"
+Export-Package: 
org.apache.qpid.server.virtualhost.plugin;uses:="org.osgi.framework",
+ org.apache.qpid.server.virtualhost.plugin.policies
 

Modified: 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java?rev=945683&r1=945682&r2=945683&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java
 Tue May 18 14:44:43 2010
@@ -47,18 +47,13 @@ public class SlowConsumerDetectionPolicy
         {
             return new String[]{
                     
"virtualhosts.virtualhost.queues.slow-consumer-detection.policy",
-                    
"virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy",
-                    
"virtualhosts.virtualhost.topics.slow-consumer-detection.policy",
-                    
"virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"};
+                    
"virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy"};
         }
     }
 
     public String[] getElementsProcessed()
     {
-        // NOTE: the use of '@name]' rather than '[...@name]' this appears to 
be
-        // a bug in commons configuration.
-        //fixme - Simple test case needs raised and JIRA raised on Commons
-        return new String[]{"@name]", "options"};
+        return new String[]{"[...@name]", "options"};
     }
 
     public String getPolicyName()

Modified: 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java?rev=945683&r1=945682&r2=945683&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java
 Tue May 18 14:44:43 2010
@@ -29,7 +29,10 @@ import org.apache.qpid.server.registry.A
 import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin;
 import 
org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory;
 
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 
 public class SlowConsumerDetectionQueueConfiguration extends 
ConfigurationPlugin
 {
@@ -47,9 +50,7 @@ public class SlowConsumerDetectionQueueC
         public String[] getParentPaths()
         {
             return new 
String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection",
-                                
"virtualhosts.virtualhost.queues.queue.slow-consumer-detection",
-                                
"virtualhosts.virtualhost.topics.slow-consumer-detection",
-                                
"virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"};
+                                
"virtualhosts.virtualhost.queues.queue.slow-consumer-detection"};
         }
 
     }
@@ -92,6 +93,21 @@ public class SlowConsumerDetectionQueueC
         Map<String, SlowConsumerPolicyPluginFactory> factories =
                 
pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class);
 
+        Iterator<?> keys = policyConfig.getConfig().getKeys();
+
+        while (keys.hasNext())
+        {
+            String key = (String) keys.next();
+
+            _logger.debug("Policy Keys:" + key);
+
+        }
+
+        if (policyConfig == null)
+        {
+            throw new ConfigurationException("No Slow Consumer Policy 
specified at:" + path + ". Known Policies:" + factories.keySet());
+        }        
+
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Configured SCDQC");

Modified: 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java?rev=945683&r1=945682&r2=945683&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java
 Tue May 18 14:44:43 2010
@@ -103,12 +103,26 @@ class SlowConsumerDetection extends Virt
      */
     private boolean checkQueueStatus(AMQQueue q, 
SlowConsumerDetectionQueueConfiguration config)
     {
+        if (config != null)
+        {
 
-        _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config);
 
-        return config != null &&
-               (q.getMessageCount() >= config.getMessageCount() ||
-                q.getQueueDepth() >= config.getDepth() ||
-                q.getOldestMessageArrivalTime() >= config.getMessageAge());
+            if ((config.getMessageCount() != 0 && q.getMessageCount() >= 
config.getMessageCount()) ||
+                    (config.getDepth() != 0 && q.getQueueDepth() >= 
config.getDepth()) ||
+                    (config.getMessageAge() != 0 && 
q.getOldestMessageArrivalTime() >= config.getMessageAge()))
+            {
+                
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Detected Slow Consumer on Queue(" + 
q.getName() + ")");
+                    _logger.info("Queue Count:" + q.getMessageCount() + ":" + 
config.getMessageCount());
+                    _logger.info("Queue Depth:" + q.getQueueDepth() + ":" + 
config.getDepth());
+                    _logger.info("Queue Arrival:" + 
q.getOldestMessageArrivalTime() + ":" + config.getMessageAge());
+                }
+
+                return true;
+            }
+        }
+        return false;
     }
 }

Modified: 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java?rev=945683&r1=945682&r2=945683&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
 (original)
+++ 
qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java
 Tue May 18 14:44:43 2010
@@ -24,6 +24,7 @@ import org.apache.commons.configuration.
 import org.apache.qpid.AMQChannelClosedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQSession_0_10;
+import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.test.utils.QpidTestCase;
 
@@ -44,7 +45,7 @@ import java.util.concurrent.TimeUnit;
  * Slow consumers should on a topic should expect to receive a
  * 506 : Resource Error if the hit a predefined threshold.
  */
-public class SlowConsumerTest extends QpidTestCase implements ExceptionListener
+public class SlowConsumerTest extends QpidTestCase implements 
ExceptionListener, ConnectionListener
 {
     Destination _destination;
     private CountDownLatch _disconnectionLatch = new CountDownLatch(1);
@@ -55,6 +56,7 @@ public class SlowConsumerTest extends Qp
     private static final long DISCONNECTION_WAIT = 5;
     private Exception _publisherError = null;
     private JMSException _connectionException = null;
+    private static final long JOIN_WAIT = 5000;
 
     @Override
     public void setUp() throws Exception, ConfigurationException, 
NamingException
@@ -68,30 +70,43 @@ public class SlowConsumerTest extends Qp
                                  + 
getConnectionURL().getVirtualHost().substring(1) +
                                  ".slow-consumer-detection.timeunit", 
"SECONDS");
 
-        setConfigurationProperty("virtualhosts.virtualhost." +
-                                 
getConnectionURL().getVirtualHost().substring(1) +
-                                 "queues.slow-consumer-detection." +
+        setConfigurationProperty("virtualhosts.virtualhost."
+                                 + 
getConnectionURL().getVirtualHost().substring(1) +
+                                 ".queues.slow-consumer-detection." +
                                  "poli...@name]", "TopicDelete");
 
+        setConfigurationProperty("virtualhosts.virtualhost."
+                                 + 
getConnectionURL().getVirtualHost().substring(1) +
+                                 ".queues.maximumMessageCount", "1");
+
+
+
         /**
          *  Queue Configuration
 
          <slow-consumer-detection>
          <!-- The depth before which the policy will be applied-->
-         <depth>4235264</depth>
+             <depth>4235264</depth>
 
-         <!-- The message age before which the policy will be applied-->
-         <messageAge>600000</messageAge>
+             <!-- The message age before which the policy will be applied-->
+             <messageAge>600000</messageAge>
 
-         <!-- The number of message before which the policy will be applied-->
-         <messageCount>50</messageCount>
+             <!-- The number of message before which the policy will be 
applied-->
+             <messageCount>50</messageCount>
 
-         <!-- Policies configuration -->
-         <policy name="TopicDelete">
-         <options>
-         <option name="delete-persistent" value="true"/>
-         </options>
-         </policy>
+             <!-- Policies configuration -->
+             <policy name="TopicDelete">
+                     <options>
+                         <option name="delete-persistent" value="true"/>
+                     </options>
+             </policy>
+
+             <policy>
+                 <name>TopicDelete"</name>
+                 <topicDelete>
+                     <delete-persistent/>
+                 </topicDelete>
+             </policy>         
          </slow-consumer-detection>
 
          */
@@ -105,8 +120,6 @@ public class SlowConsumerTest extends Qp
          </slow-consumer-detection>
 
          */
-
-        super.setUp();
     }
 
     public void exclusiveTransientQueue(int ackMode) throws Exception
@@ -167,8 +180,10 @@ public class SlowConsumerTest extends Qp
 
         System.err.println("Linked:" + linked);
 
-        _publisher.join();
+        _publisher.join(JOIN_WAIT);
 
+        assertFalse("Publisher still running", _publisher.isAlive());
+        
         //Validate publishing occurred ok
         if (_publisherError != null)
         {
@@ -205,6 +220,7 @@ public class SlowConsumerTest extends Qp
                     for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; 
count++)
                     {
                         publisher.send(createNextMessage(session, count));
+                        session.commit();
                     }
                 }
                 catch (Exception e)
@@ -214,15 +230,22 @@ public class SlowConsumerTest extends Qp
                 }
             }
         });
+
+        _publisher.start();
     }
 
     public void testAutoAckTopicConsumerMessageCount() throws Exception
     {
         MAX_QUEUE_MESSAGE_COUNT = 10;
+
         setConfigurationProperty("virtualhosts.virtualhost." +
                                  
getConnectionURL().getVirtualHost().substring(1) +
-                                 "queues.slow-consumer-detection" +
-                                 "messageCount", "9");
+                                 ".queues.slow-consumer-detection." +
+                                 "messageCount", 
String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1));
+
+        //Start the broker
+        super.setUp();
+
 
         setMessageSize(MESSAGE_SIZE);
 
@@ -233,6 +256,34 @@ public class SlowConsumerTest extends Qp
     {
         _connectionException = e;
 
+        System.out.println("***** SCT Received Exception: "+e);
+        e.printStackTrace();
+
         _disconnectionLatch.countDown();
     }
+
+    /// Connection Listener
+
+    public void bytesSent(long count)
+    {
+    }
+
+    public void bytesReceived(long count)
+    {
+    }
+
+    public boolean preFailover(boolean redirect)
+    {
+        // Prevent Failover
+        return false;
+    }
+
+    public boolean preResubscribe()
+    {
+        return false;
+    }
+
+    public void failoverComplete()
+    {
+    }
 }

Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=945683&r1=945682&r2=945683&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original)
+++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Tue May 18 14:44:43 2010
@@ -119,3 +119,7 @@ org.apache.qpid.server.queue.ConflationQ
 
 # Temporarily adding the following until the issues are sorted out.
 org.apache.qpid.test.unit.client.AMQConnectionTest#testHeartBeat
+
+//Excluded due to QPID-1447 : CPP broker does not have SlowConsumer 
Disconnection
+org.apache.qpid.systest.SlowConsumerTest#*
+

Modified: qpid/trunk/qpid/java/test-profiles/Excludes
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=945683&r1=945682&r2=945683&view=diff
==============================================================================
--- qpid/trunk/qpid/java/test-profiles/Excludes (original)
+++ qpid/trunk/qpid/java/test-profiles/Excludes Tue May 18 14:44:43 2010
@@ -32,5 +32,3 @@ org.apache.qpid.test.unit.ack.Acknowledg
 // QPID-2418 : The queue backing the dur sub is not currently deleted at 
subscription change, so the test will fail.
 
org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart
 
-// QPID-1447 : Work In Progress
-org.apache.qpid.systest.SlowConsumerTest#*



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to