This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch activemq-5.17.x in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.17.x by this push: new 4ac6065ac AMQ-9192 - Fix flaky AdvisoryTests 4ac6065ac is described below commit 4ac6065acee402e7b002a9418d303a76ba2c4f85 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> AuthorDate: Tue Jan 10 12:29:33 2023 -0500 AMQ-9192 - Fix flaky AdvisoryTests Properly shutdown broker for each test and speed up tests by sending less messages (cherry picked from commit 044f5346e93d9b133fcecccb6000d16967aa0f1e) --- .../org/apache/activemq/advisory/AdvisoryTests.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java index 9207fb0b4..f086ff3f1 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java @@ -66,13 +66,14 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class AdvisoryTests { - protected static final int MESSAGE_COUNT = 2000; + protected static final int MESSAGE_COUNT = 100; protected BrokerService broker; protected Connection connection; protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL; protected final boolean includeBodyForAdvisory; protected final boolean persistent; protected final int EXPIRE_MESSAGE_PERIOD = 3000; + protected final int DEFAULT_PREFETCH = 50; @Parameters(name = "includeBodyForAdvisory={0}, persistent={1}") public static Collection<Object[]> data() { @@ -126,7 +127,7 @@ public class AdvisoryTests { @Test(timeout = 60000) public void testTopicSlowConsumerAdvisory() throws Exception { - broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(500); + broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(10); broker.getDestinationPolicy().getDefaultEntry().setPendingMessageLimitStrategy(null); testSlowConsumerAdvisory(new ActiveMQTopic(getClass().getName())); } @@ -496,8 +497,7 @@ public class AdvisoryTests { MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic); // start throwing messages at the consumer MessageProducer producer = s.createProducer(topic); - int count = (new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2); - for (int i = 0; i < count; i++) { + for (int i = 0; i < MESSAGE_COUNT; i++) { BytesMessage m = s.createBytesMessage(); m.writeBytes(new byte[1024]); producer.send(m); @@ -571,7 +571,11 @@ public class AdvisoryTests { @After public void tearDown() throws Exception { - connection.close(); + try { + connection.close(); + } catch (Exception e) { + //swallow exception so we can still stop the broker even on error + } if (broker != null) { broker.stop(); } @@ -602,6 +606,10 @@ public class AdvisoryTests { policy.setAdvisoryWhenFull(true); policy.setIncludeBodyForAdvisory(includeBodyForAdvisory); policy.setProducerFlowControl(false); + policy.setDurableTopicPrefetch(DEFAULT_PREFETCH); + policy.setTopicPrefetch(DEFAULT_PREFETCH); + policy.setQueuePrefetch(DEFAULT_PREFETCH); + ConstantPendingMessageLimitStrategy strategy = new ConstantPendingMessageLimitStrategy(); strategy.setLimit(10); policy.setPendingMessageLimitStrategy(strategy);