Author: ritchiem Date: Tue May 18 14:44:43 2010 New Revision: 945683 URL: http://svn.apache.org/viewvc?rev=945683&view=rev Log: QPID-1447 : Update Plugins to use changes to ConfigurationPlugin, Update Test to correctly run and prevent failover. Update excludes to include test in Java Broker runs but not CPP or 010.
Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java qpid/trunk/qpid/java/test-profiles/CPPExcludes qpid/trunk/qpid/java/test-profiles/Excludes Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF?rev=945683&r1=945682&r2=945683&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF (original) +++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/MANIFEST.MF Tue May 18 14:44:43 2010 @@ -3,20 +3,26 @@ Bundle-ManifestVersion: 2 Bundle-Name: Qpid Slow Consumer Detection Bundle-SymbolicName: qpid_slow_consumer_detection;singleton:=true Bundle-Version: 1.0.0 -Bundle-Activator: org.apache.qpid.server.virtualhost.plugins.Activator +Bundle-Activator: org.apache.qpid.server.virtualhost.plugin.Activator Import-Package: org.osgi.framework, org.apache.qpid.server.configuration.plugins, org.apache.qpid.server.configuration, org.apache.qpid.server.virtualhost.plugins, org.apache.qpid.server.virtualhost, org.apache.qpid.server.queue, + org.apache.qpid.server.binding, + org.apache.qpid.server.exchange, org.apache.qpid.server.registry, org.apache.qpid.server.plugins, + org.apache.qpid.server.protocol, + org.apache.qpid.protocol, + org.apache.qpid.framing, org.apache.qpid, org.apache.log4j, org.apache.commons.configuration Bundle-RequiredExecutionEnvironment: JavaSE-1.6 Bundle-ClassPath: . Bundle-ActivationPolicy: lazy -Export-Package: org.apache.qpid.server.virtualhost.plugins;uses:="org.osgi.framework" +Export-Package: org.apache.qpid.server.virtualhost.plugin;uses:="org.osgi.framework", + org.apache.qpid.server.virtualhost.plugin.policies Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java?rev=945683&r1=945682&r2=945683&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java (original) +++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionPolicyConfiguration.java Tue May 18 14:44:43 2010 @@ -47,18 +47,13 @@ public class SlowConsumerDetectionPolicy { return new String[]{ "virtualhosts.virtualhost.queues.slow-consumer-detection.policy", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy", - "virtualhosts.virtualhost.topics.slow-consumer-detection.policy", - "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection.policy"}; + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection.policy"}; } } public String[] getElementsProcessed() { - // NOTE: the use of '@name]' rather than '[...@name]' this appears to be - // a bug in commons configuration. - //fixme - Simple test case needs raised and JIRA raised on Commons - return new String[]{"@name]", "options"}; + return new String[]{"[...@name]", "options"}; } public String getPolicyName() Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java?rev=945683&r1=945682&r2=945683&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java (original) +++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/configuration/plugin/SlowConsumerDetectionQueueConfiguration.java Tue May 18 14:44:43 2010 @@ -29,7 +29,10 @@ import org.apache.qpid.server.registry.A import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPlugin; import org.apache.qpid.slowconsumerdetection.policies.SlowConsumerPolicyPluginFactory; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; +import java.util.Set; public class SlowConsumerDetectionQueueConfiguration extends ConfigurationPlugin { @@ -47,9 +50,7 @@ public class SlowConsumerDetectionQueueC public String[] getParentPaths() { return new String[]{"virtualhosts.virtualhost.queues.slow-consumer-detection", - "virtualhosts.virtualhost.queues.queue.slow-consumer-detection", - "virtualhosts.virtualhost.topics.slow-consumer-detection", - "virtualhosts.virtualhost.queues.topics.topic.slow-consumer-detection"}; + "virtualhosts.virtualhost.queues.queue.slow-consumer-detection"}; } } @@ -92,6 +93,21 @@ public class SlowConsumerDetectionQueueC Map<String, SlowConsumerPolicyPluginFactory> factories = pluginManager.getPlugins(SlowConsumerPolicyPluginFactory.class); + Iterator<?> keys = policyConfig.getConfig().getKeys(); + + while (keys.hasNext()) + { + String key = (String) keys.next(); + + _logger.debug("Policy Keys:" + key); + + } + + if (policyConfig == null) + { + throw new ConfigurationException("No Slow Consumer Policy specified at:" + path + ". Known Policies:" + factories.keySet()); + } + if (_logger.isDebugEnabled()) { _logger.debug("Configured SCDQC"); Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java?rev=945683&r1=945682&r2=945683&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java (original) +++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/main/java/org/apache/qpid/server/virtualhost/plugin/SlowConsumerDetection.java Tue May 18 14:44:43 2010 @@ -103,12 +103,26 @@ class SlowConsumerDetection extends Virt */ private boolean checkQueueStatus(AMQQueue q, SlowConsumerDetectionQueueConfiguration config) { + if (config != null) + { - _logger.info("Retrieved Queue(" + q.getName() + ") Config:" + config); - return config != null && - (q.getMessageCount() >= config.getMessageCount() || - q.getQueueDepth() >= config.getDepth() || - q.getOldestMessageArrivalTime() >= config.getMessageAge()); + if ((config.getMessageCount() != 0 && q.getMessageCount() >= config.getMessageCount()) || + (config.getDepth() != 0 && q.getQueueDepth() >= config.getDepth()) || + (config.getMessageAge() != 0 && q.getOldestMessageArrivalTime() >= config.getMessageAge())) + { + + if (_logger.isInfoEnabled()) + { + _logger.info("Detected Slow Consumer on Queue(" + q.getName() + ")"); + _logger.info("Queue Count:" + q.getMessageCount() + ":" + config.getMessageCount()); + _logger.info("Queue Depth:" + q.getQueueDepth() + ":" + config.getDepth()); + _logger.info("Queue Arrival:" + q.getOldestMessageArrivalTime() + ":" + config.getMessageAge()); + } + + return true; + } + } + return false; } } Modified: qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java?rev=945683&r1=945682&r2=945683&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java (original) +++ qpid/trunk/qpid/java/broker-plugins/experimental/slowconsumerdetection/src/test/java/org/apache/qpid/systest/SlowConsumerTest.java Tue May 18 14:44:43 2010 @@ -24,6 +24,7 @@ import org.apache.commons.configuration. import org.apache.qpid.AMQChannelClosedException; import org.apache.qpid.AMQException; import org.apache.qpid.client.AMQSession_0_10; +import org.apache.qpid.jms.ConnectionListener; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.test.utils.QpidTestCase; @@ -44,7 +45,7 @@ import java.util.concurrent.TimeUnit; * Slow consumers should on a topic should expect to receive a * 506 : Resource Error if the hit a predefined threshold. */ -public class SlowConsumerTest extends QpidTestCase implements ExceptionListener +public class SlowConsumerTest extends QpidTestCase implements ExceptionListener, ConnectionListener { Destination _destination; private CountDownLatch _disconnectionLatch = new CountDownLatch(1); @@ -55,6 +56,7 @@ public class SlowConsumerTest extends Qp private static final long DISCONNECTION_WAIT = 5; private Exception _publisherError = null; private JMSException _connectionException = null; + private static final long JOIN_WAIT = 5000; @Override public void setUp() throws Exception, ConfigurationException, NamingException @@ -68,30 +70,43 @@ public class SlowConsumerTest extends Qp + getConnectionURL().getVirtualHost().substring(1) + ".slow-consumer-detection.timeunit", "SECONDS"); - setConfigurationProperty("virtualhosts.virtualhost." + - getConnectionURL().getVirtualHost().substring(1) + - "queues.slow-consumer-detection." + + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.slow-consumer-detection." + "poli...@name]", "TopicDelete"); + setConfigurationProperty("virtualhosts.virtualhost." + + getConnectionURL().getVirtualHost().substring(1) + + ".queues.maximumMessageCount", "1"); + + + /** * Queue Configuration <slow-consumer-detection> <!-- The depth before which the policy will be applied--> - <depth>4235264</depth> + <depth>4235264</depth> - <!-- The message age before which the policy will be applied--> - <messageAge>600000</messageAge> + <!-- The message age before which the policy will be applied--> + <messageAge>600000</messageAge> - <!-- The number of message before which the policy will be applied--> - <messageCount>50</messageCount> + <!-- The number of message before which the policy will be applied--> + <messageCount>50</messageCount> - <!-- Policies configuration --> - <policy name="TopicDelete"> - <options> - <option name="delete-persistent" value="true"/> - </options> - </policy> + <!-- Policies configuration --> + <policy name="TopicDelete"> + <options> + <option name="delete-persistent" value="true"/> + </options> + </policy> + + <policy> + <name>TopicDelete"</name> + <topicDelete> + <delete-persistent/> + </topicDelete> + </policy> </slow-consumer-detection> */ @@ -105,8 +120,6 @@ public class SlowConsumerTest extends Qp </slow-consumer-detection> */ - - super.setUp(); } public void exclusiveTransientQueue(int ackMode) throws Exception @@ -167,8 +180,10 @@ public class SlowConsumerTest extends Qp System.err.println("Linked:" + linked); - _publisher.join(); + _publisher.join(JOIN_WAIT); + assertFalse("Publisher still running", _publisher.isAlive()); + //Validate publishing occurred ok if (_publisherError != null) { @@ -205,6 +220,7 @@ public class SlowConsumerTest extends Qp for (int count = 0; count < MAX_QUEUE_MESSAGE_COUNT; count++) { publisher.send(createNextMessage(session, count)); + session.commit(); } } catch (Exception e) @@ -214,15 +230,22 @@ public class SlowConsumerTest extends Qp } } }); + + _publisher.start(); } public void testAutoAckTopicConsumerMessageCount() throws Exception { MAX_QUEUE_MESSAGE_COUNT = 10; + setConfigurationProperty("virtualhosts.virtualhost." + getConnectionURL().getVirtualHost().substring(1) + - "queues.slow-consumer-detection" + - "messageCount", "9"); + ".queues.slow-consumer-detection." + + "messageCount", String.valueOf(MAX_QUEUE_MESSAGE_COUNT - 1)); + + //Start the broker + super.setUp(); + setMessageSize(MESSAGE_SIZE); @@ -233,6 +256,34 @@ public class SlowConsumerTest extends Qp { _connectionException = e; + System.out.println("***** SCT Received Exception: "+e); + e.printStackTrace(); + _disconnectionLatch.countDown(); } + + /// Connection Listener + + public void bytesSent(long count) + { + } + + public void bytesReceived(long count) + { + } + + public boolean preFailover(boolean redirect) + { + // Prevent Failover + return false; + } + + public boolean preResubscribe() + { + return false; + } + + public void failoverComplete() + { + } } Modified: qpid/trunk/qpid/java/test-profiles/CPPExcludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/CPPExcludes?rev=945683&r1=945682&r2=945683&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/CPPExcludes (original) +++ qpid/trunk/qpid/java/test-profiles/CPPExcludes Tue May 18 14:44:43 2010 @@ -119,3 +119,7 @@ org.apache.qpid.server.queue.ConflationQ # Temporarily adding the following until the issues are sorted out. org.apache.qpid.test.unit.client.AMQConnectionTest#testHeartBeat + +//Excluded due to QPID-1447 : CPP broker does not have SlowConsumer Disconnection +org.apache.qpid.systest.SlowConsumerTest#* + Modified: qpid/trunk/qpid/java/test-profiles/Excludes URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/test-profiles/Excludes?rev=945683&r1=945682&r2=945683&view=diff ============================================================================== --- qpid/trunk/qpid/java/test-profiles/Excludes (original) +++ qpid/trunk/qpid/java/test-profiles/Excludes Tue May 18 14:44:43 2010 @@ -32,5 +32,3 @@ org.apache.qpid.test.unit.ack.Acknowledg // QPID-2418 : The queue backing the dur sub is not currently deleted at subscription change, so the test will fail. org.apache.qpid.test.unit.ct.DurableSubscriberTest#testResubscribeWithChangedSelectorAndRestart -// QPID-1447 : Work In Progress -org.apache.qpid.systest.SlowConsumerTest#* --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org