http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java deleted file mode 100644 index 7e46df4..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBlockResumeTest.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.File; -import java.util.Vector; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.runners.BlockJUnit4ClassRunner; -import org.junit.runner.RunWith; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@RunWith(BlockJUnit4ClassRunner.class) -public class MemoryUsageBlockResumeTest extends TestSupport implements Thread.UncaughtExceptionHandler { - - public int deliveryMode = DeliveryMode.PERSISTENT; - - private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBlockResumeTest.class); - private static byte[] buf = new byte[4 * 1024]; - private static byte[] bigBuf = new byte[48 * 1024]; - - private BrokerService broker; - AtomicInteger messagesSent = new AtomicInteger(0); - AtomicInteger messagesConsumed = new AtomicInteger(0); - - protected long messageReceiveTimeout = 10000L; - - Destination destination = new ActiveMQQueue("FooTwo"); - Destination bigDestination = new ActiveMQQueue("FooTwoBig"); - - private String connectionUri; - private final Vector<Throwable> exceptions = new Vector<>(); - - @Test(timeout = 60 * 1000) - public void testBlockByOtherResumeNoException() throws Exception { - - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - - // ensure more than on message can be pending when full - factory.setProducerWindowSize(48 * 1024); - // ensure messages are spooled to disk for this consumer - ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy(); - prefetch.setTopicPrefetch(10); - factory.setPrefetchPolicy(prefetch); - Connection consumerConnection = factory.createConnection(); - consumerConnection.start(); - - Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = consumerSession.createConsumer(bigDestination); - - final Connection producerConnection = factory.createConnection(); - producerConnection.start(); - - final int fillWithBigCount = 10; - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(null); - producer.setDeliveryMode(deliveryMode); - for (int idx = 0; idx < fillWithBigCount; ++idx) { - Message message = session.createTextMessage(new String(bigBuf) + idx); - producer.send(bigDestination, message); - messagesSent.incrementAndGet(); - LOG.info("After big: " + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - } - - // will block on pfc - final int toSend = 20; - Thread producingThread = new Thread("Producing thread") { - @Override - public void run() { - try { - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - for (int idx = 0; idx < toSend; ++idx) { - Message message = session.createTextMessage(new String(buf) + idx); - producer.send(destination, message); - messagesSent.incrementAndGet(); - LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - } - } - catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - producingThread.start(); - - Thread producingThreadTwo = new Thread("Producing thread") { - @Override - public void run() { - try { - Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(deliveryMode); - for (int idx = 0; idx < toSend; ++idx) { - Message message = session.createTextMessage(new String(buf) + idx); - producer.send(destination, message); - messagesSent.incrementAndGet(); - LOG.info("After little:" + idx + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - } - } - catch (Throwable ex) { - ex.printStackTrace(); - } - } - }; - producingThreadTwo.start(); - - assertTrue("producer has sent x in a reasonable time", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("Checking for : X sent, System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + ", sent: " + messagesSent); - return messagesSent.get() > 20; - } - })); - - LOG.info("Consuming from big q to allow delivery to smaller q from pending"); - int count = 0; - - Message m = null; - - for (; count < 10; count++) { - assertTrue((m = consumer.receive(messageReceiveTimeout)) != null); - LOG.info("Received Message (" + count + "):" + m + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - messagesConsumed.incrementAndGet(); - } - consumer.close(); - - producingThread.join(); - producingThreadTwo.join(); - - assertEquals("Incorrect number of Messages Sent: " + messagesSent.get(), messagesSent.get(), fillWithBigCount + toSend * 2); - - // consume all little messages - consumer = consumerSession.createConsumer(destination); - for (count = 0; count < toSend * 2; count++) { - assertTrue((m = consumer.receive(messageReceiveTimeout)) != null); - LOG.info("Received Message (" + count + "):" + m + ", System Memory Usage " + broker.getSystemUsage().getMemoryUsage().getPercentUsage()); - messagesConsumed.incrementAndGet(); - } - - assertEquals("Incorrect number of Messages consumed: " + messagesConsumed.get(), messagesSent.get(), messagesConsumed.get()); - - //assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); - } - - @Override - @Before - public void setUp() throws Exception { - - Thread.setDefaultUncaughtExceptionHandler(this); - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(false); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - - setDefaultPersistenceAdapter(broker); - broker.getSystemUsage().getMemoryUsage().setLimit((30 * 16 * 1024)); - - PolicyEntry defaultPolicy = new PolicyEntry(); - defaultPolicy.setOptimizedDispatch(true); - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(defaultPolicy); - broker.setDestinationPolicy(policyMap); - - broker.addConnector("tcp://localhost:0"); - broker.start(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - @Override - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - } - } - - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.error("Unexpected Unhandeled ex on: " + t, e); - exceptions.add(e); - } -}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java deleted file mode 100644 index 4653ea6..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageBrokerTest.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.BrokerTestSupport; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.store.kahadb.KahaDBStore; -import org.apache.activemq.util.IOHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.*; -import java.io.File; - -public class MemoryUsageBrokerTest extends BrokerTestSupport { - - private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageBrokerTest.class); - - @Override - protected void setUp() throws Exception { - this.setAutoFail(true); - super.setUp(); - } - - @Override - protected PolicyEntry getDefaultPolicy() { - PolicyEntry policy = super.getDefaultPolicy(); - // Disable PFC and assign a large memory limit that's larger than the default broker memory limit for queues - policy.setProducerFlowControl(false); - policy.setQueue(">"); - policy.setMemoryLimit(128 * 1024 * 1024); - return policy; - } - - @Override - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - KahaDBStore kaha = new KahaDBStore(); - File directory = new File("target/activemq-data/kahadb"); - IOHelper.deleteChildren(directory); - kaha.setDirectory(directory); - kaha.deleteAllMessages(); - broker.setPersistenceAdapter(kaha); - return broker; - } - - protected ConnectionFactory createConnectionFactory() { - return new ActiveMQConnectionFactory(broker.getVmConnectorURI()); - } - - protected Connection createJmsConnection() throws JMSException { - return createConnectionFactory().createConnection(); - } - - public void testMemoryUsage() throws Exception { - Connection conn = createJmsConnection(); - Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("queue.a.b"); - MessageProducer producer = session.createProducer(queue); - for (int i = 0; i < 100000; i++) { - BytesMessage bm = session.createBytesMessage(); - bm.writeBytes(new byte[1024]); - producer.send(bm); - if ((i + 1) % 100 == 0) { - session.commit(); - int memoryUsagePercent = broker.getSystemUsage().getMemoryUsage().getPercentUsage(); - LOG.info((i + 1) + " messages have been sent; broker memory usage " + memoryUsagePercent + "%"); - assertTrue("Used more than available broker memory", memoryUsagePercent <= 100); - } - } - session.commit(); - producer.close(); - session.close(); - conn.close(); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java deleted file mode 100644 index e89c93f..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MemoryUsageCleanupTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class MemoryUsageCleanupTest { - - private static final Logger LOG = LoggerFactory.getLogger(MemoryUsageCleanupTest.class); - private static final String QUEUE_NAME = MemoryUsageCleanupTest.class.getName() + "Queue"; - - private final String str = new String("QAa0bcLdUK2eHfJgTP8XhiFj61DOklNm9nBoI5pGqYVrs3CtSuMZvwWx4yE7zR"); - - private BrokerService broker; - private String connectionUri; - private ExecutorService pool; - private String queueName; - private Random r = new Random(); - - @Before - public void setUp() throws Exception { - - broker = new BrokerService(); - broker.setDataDirectory("target" + File.separator + "activemq-data"); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.setDedicatedTaskRunner(false); - broker.setAdvisorySupport(false); - broker.setDeleteAllMessagesOnStartup(true); - - SharedDeadLetterStrategy strategy = new SharedDeadLetterStrategy(); - strategy.setProcessExpired(false); - strategy.setProcessNonPersistent(false); - - PolicyEntry defaultPolicy = new PolicyEntry(); - defaultPolicy.setQueue(">"); - defaultPolicy.setOptimizedDispatch(true); - defaultPolicy.setDeadLetterStrategy(strategy); - defaultPolicy.setMemoryLimit(300000000); - - PolicyMap policyMap = new PolicyMap(); - policyMap.setDefaultEntry(defaultPolicy); - - broker.setDestinationPolicy(policyMap); - - broker.getSystemUsage().getMemoryUsage().setLimit(300000000L); - - broker.addConnector("tcp://localhost:0").setName("Default"); - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - pool = Executors.newFixedThreadPool(10); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - - if (pool != null) { - pool.shutdown(); - } - } - - @Test - public void testIt() throws Exception { - - final int startPercentage = broker.getAdminView().getMemoryPercentUsage(); - LOG.info("MemoryUsage at test start = " + startPercentage); - - for (int i = 0; i < 2; i++) { - LOG.info("Started the test iteration: " + i + " using queueName = " + queueName); - queueName = QUEUE_NAME + i; - final CountDownLatch latch = new CountDownLatch(11); - - pool.execute(new Runnable() { - @Override - public void run() { - receiveAndDiscard100messages(latch); - } - }); - - for (int j = 0; j < 10; j++) { - pool.execute(new Runnable() { - @Override - public void run() { - send10000messages(latch); - } - }); - } - - LOG.info("Waiting on the send / receive latch"); - latch.await(5, TimeUnit.MINUTES); - LOG.info("Resumed"); - - destroyQueue(); - TimeUnit.SECONDS.sleep(2); - } - - LOG.info("MemoryUsage before awaiting temp store cleanup = " + broker.getAdminView().getMemoryPercentUsage()); - - assertTrue("MemoryUsage should return to: " + startPercentage + - "% but was " + broker.getAdminView().getMemoryPercentUsage() + "%", Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return broker.getAdminView().getMemoryPercentUsage() <= startPercentage + 1; - } - })); - - int endPercentage = broker.getAdminView().getMemoryPercentUsage(); - LOG.info("MemoryUsage at test end = " + endPercentage); - } - - public void destroyQueue() { - try { - Broker broker = this.broker.getBroker(); - if (!broker.isStopped()) { - LOG.info("Removing: " + queueName); - broker.removeDestination(this.broker.getAdminConnectionContext(), new ActiveMQQueue(queueName), 10); - } - } - catch (Exception e) { - LOG.warn("Got an error while removing the test queue", e); - } - } - - private void send10000messages(CountDownLatch latch) { - ActiveMQConnection activeMQConnection = null; - try { - activeMQConnection = createConnection(null); - Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(queueName)); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - activeMQConnection.start(); - for (int i = 0; i < 10000; i++) { - TextMessage textMessage = session.createTextMessage(); - textMessage.setText(generateBody(1000)); - textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(textMessage); - try { - Thread.sleep(10); - } - catch (InterruptedException e) { - } - } - producer.close(); - } - catch (JMSException e) { - LOG.warn("Got an error while sending the messages", e); - } - finally { - if (activeMQConnection != null) { - try { - activeMQConnection.close(); - } - catch (JMSException e) { - } - } - } - latch.countDown(); - } - - private void receiveAndDiscard100messages(CountDownLatch latch) { - ActiveMQConnection activeMQConnection = null; - try { - activeMQConnection = createConnection(null); - Session session = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer messageConsumer = session.createConsumer(session.createQueue(queueName)); - activeMQConnection.start(); - for (int i = 0; i < 100; i++) { - messageConsumer.receive(); - } - messageConsumer.close(); - LOG.info("Created and disconnected"); - } - catch (JMSException e) { - LOG.warn("Got an error while receiving the messages", e); - } - finally { - if (activeMQConnection != null) { - try { - activeMQConnection.close(); - } - catch (JMSException e) { - } - } - } - latch.countDown(); - } - - private ActiveMQConnection createConnection(String id) throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - if (id != null) { - factory.setClientID(id); - } - - ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); - return connection; - } - - private String generateBody(int length) { - - StringBuilder sb = new StringBuilder(); - int te = 0; - for (int i = 1; i <= length; i++) { - te = r.nextInt(62); - sb.append(str.charAt(te)); - } - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java deleted file mode 100644 index 3cdd0d6..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageExpirationReaperTest.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; - -import javax.jms.*; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.DestinationViewMBean; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQTopic; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -/** - * Test to determine if expired messages are being reaped if there is - * no active consumer connected to the broker. - */ -public class MessageExpirationReaperTest { - - private BrokerService broker; - private ConnectionFactory factory; - private ActiveMQConnection connection; - private final String destinationName = "TEST.Q"; - private final String brokerUrl = "tcp://localhost:0"; - private final String brokerName = "testBroker"; - private String connectionUri; - - @Before - public void init() throws Exception { - createBroker(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - - factory = createConnectionFactory(); - connection = (ActiveMQConnection) factory.createConnection(); - connection.setClientID("test-connection"); - connection.start(); - } - - @After - public void cleanUp() throws Exception { - connection.close(); - broker.stop(); - } - - protected void createBroker() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setBrokerName(brokerName); - broker.addConnector(brokerUrl); - - PolicyMap policyMap = new PolicyMap(); - PolicyEntry defaultEntry = new PolicyEntry(); - defaultEntry.setExpireMessagesPeriod(500); - policyMap.setDefaultEntry(defaultEntry); - broker.setDestinationPolicy(policyMap); - - broker.start(); - } - - protected ConnectionFactory createConnectionFactory() throws Exception { - return new ActiveMQConnectionFactory(connectionUri); - } - - protected Session createSession() throws Exception { - return connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } - - @Test - public void testExpiredMessageReaping() throws Exception { - - Session producerSession = createSession(); - ActiveMQDestination destination = (ActiveMQDestination) producerSession.createQueue(destinationName); - MessageProducer producer = producerSession.createProducer(destination); - producer.setTimeToLive(1000); - - final int count = 3; - // Send some messages with an expiration - for (int i = 0; i < count; i++) { - TextMessage message = producerSession.createTextMessage("" + i); - producer.send(message); - } - - // Let the messages expire - Thread.sleep(2000); - - DestinationViewMBean view = createView(destination); - - assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount()); - assertEquals("Incorrect queue size count", 0, view.getQueueSize()); - assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount()); - - // Send more messages with an expiration - for (int i = 0; i < count; i++) { - TextMessage message = producerSession.createTextMessage("" + i); - producer.send(message); - } - - // Let the messages expire - Thread.sleep(2000); - - // Simply browse the queue - Session browserSession = createSession(); - QueueBrowser browser = browserSession.createBrowser((Queue) destination); - assertFalse("no message in the browser", browser.getEnumeration().hasMoreElements()); - - // The messages expire and should be reaped because of the presence of - // the queue browser - assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount()); - } - - @Test - public void testExpiredMessagesOnTopic() throws Exception { - Session session = createSession(); - - // use a zero prefetch so messages don't go inflight - ActiveMQTopic destination = new ActiveMQTopic(destinationName + "?consumer.prefetchSize=0"); - - MessageProducer producer = session.createProducer(destination); - - // should have a durable sub because it's a little tricky to get messages to expire in - // non-durable subs.. with durable subs, we can just expire in the topic using the expire - // period.. also.. durable sub has to be "inactive" for the expire checker to actually - // expire the messages - MessageConsumer consumer = session.createDurableSubscriber(destination, "test-durable"); - - producer.setTimeToLive(500); - - final int count = 3; - // Send some messages with an expiration - for (int i = 0; i < count; i++) { - TextMessage message = session.createTextMessage("" + i); - producer.send(message); - } - - DestinationViewMBean view = createView(destination); - // not expired yet... - assertEquals("Incorrect enqueue count", 3, view.getEnqueueCount()); - - // close consumer so topic thinks consumer is inactive - consumer.close(); - - // Let the messages reach an expiry time - Thread.sleep(2000); - - assertEquals("Incorrect inflight count: " + view.getInFlightCount(), 0, view.getInFlightCount()); - assertEquals("Incorrect queue size count", 0, view.getQueueSize()); - assertEquals("Incorrect expired size count", view.getEnqueueCount(), view.getExpiredCount()); - } - - protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { - String domain = "org.apache.activemq"; - ObjectName name; - if (destination.isQueue()) { - name = new ObjectName(domain + ":type=Broker,brokerName=" + brokerName + ",destinationType=Queue,destinationName=" + destinationName); - } - else { - name = new ObjectName(domain + ":type=Broker,brokerName=" + brokerName + ",destinationType=Topic,destinationName=" + destinationName); - } - return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java deleted file mode 100644 index e7d22b1..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MessageSender.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -public class MessageSender { - - private MessageProducer producer; - private Session session; - - public MessageSender(String queueName, - Connection connection, - boolean useTransactedSession, - boolean topic) throws Exception { - session = useTransactedSession ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producer = session.createProducer(topic ? session.createTopic(queueName) : session.createQueue(queueName)); - } - - public void send(String payload) throws Exception { - ObjectMessage message = session.createObjectMessage(); - message.setObject(payload); - producer.send(message); - if (session.getTransacted()) { - session.commit(); - } - } - - public MessageProducer getProducer() { - return producer; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java deleted file mode 100644 index b278dc9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/MissingDataFileTest.java +++ /dev/null @@ -1,333 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.ObjectMessage; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.usage.SystemUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/* - * Try and replicate: - * Caused by: java.io.IOException: Could not locate data file data--188 - * at org.apache.activemq.kaha.impl.async.AsyncDataManager.getDataFile(AsyncDataManager.java:302) - * at org.apache.activemq.kaha.impl.async.AsyncDataManager.read(AsyncDataManager.java:614) - * at org.apache.activemq.store.amq.AMQPersistenceAdapter.readCommand(AMQPersistenceAdapter.java:523) - */ - -public class MissingDataFileTest extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(MissingDataFileTest.class); - - private static int counter = 500; - - private static int hectorToHaloCtr; - private static int xenaToHaloCtr; - private static int troyToHaloCtr; - - private static int haloToHectorCtr; - private static int haloToXenaCtr; - private static int haloToTroyCtr; - - private final String hectorToHalo = "hectorToHalo"; - private final String xenaToHalo = "xenaToHalo"; - private final String troyToHalo = "troyToHalo"; - - private final String haloToHector = "haloToHector"; - private final String haloToXena = "haloToXena"; - private final String haloToTroy = "haloToTroy"; - - private BrokerService broker; - - private Connection hectorConnection; - private Connection xenaConnection; - private Connection troyConnection; - private Connection haloConnection; - - private final Object lock = new Object(); - final boolean useTopic = false; - final boolean useSleep = true; - - protected static final String payload = new String(new byte[500]); - - public Connection createConnection() throws JMSException { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); - return factory.createConnection(); - } - - public Session createSession(Connection connection, boolean transacted) throws JMSException { - return connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); - } - - public void startBroker() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setPersistent(true); - broker.setUseJmx(true); - broker.addConnector("tcp://localhost:61616").setName("Default"); - - SystemUsage systemUsage; - systemUsage = new SystemUsage(); - systemUsage.getMemoryUsage().setLimit(10 * 1024 * 1024); // Just a few messags - broker.setSystemUsage(systemUsage); - - KahaDBPersistenceAdapter kahaDBPersistenceAdapter = new KahaDBPersistenceAdapter(); - kahaDBPersistenceAdapter.setJournalMaxFileLength(16 * 1024); - kahaDBPersistenceAdapter.setCleanupInterval(500); - broker.setPersistenceAdapter(kahaDBPersistenceAdapter); - - broker.start(); - LOG.info("Starting broker.."); - } - - @Override - public void tearDown() throws Exception { - hectorConnection.close(); - xenaConnection.close(); - troyConnection.close(); - haloConnection.close(); - broker.stop(); - } - - public void testForNoDataFoundError() throws Exception { - - startBroker(); - hectorConnection = createConnection(); - Thread hectorThread = buildProducer(hectorConnection, hectorToHalo, false, useTopic); - Receiver hHectorReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - haloToHectorCtr++; - if (haloToHectorCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - possiblySleep(haloToHectorCtr); - } - }; - buildReceiver(hectorConnection, haloToHector, false, hHectorReceiver, useTopic); - - troyConnection = createConnection(); - Thread troyThread = buildProducer(troyConnection, troyToHalo); - Receiver hTroyReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - haloToTroyCtr++; - if (haloToTroyCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - possiblySleep(haloToTroyCtr); - } - }; - buildReceiver(hectorConnection, haloToTroy, false, hTroyReceiver, false); - - xenaConnection = createConnection(); - Thread xenaThread = buildProducer(xenaConnection, xenaToHalo); - Receiver hXenaReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - haloToXenaCtr++; - if (haloToXenaCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - possiblySleep(haloToXenaCtr); - } - }; - buildReceiver(xenaConnection, haloToXena, false, hXenaReceiver, false); - - haloConnection = createConnection(); - final MessageSender hectorSender = buildTransactionalProducer(haloToHector, haloConnection, false); - final MessageSender troySender = buildTransactionalProducer(haloToTroy, haloConnection, false); - final MessageSender xenaSender = buildTransactionalProducer(haloToXena, haloConnection, false); - Receiver hectorReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - hectorToHaloCtr++; - troySender.send(payload); - if (hectorToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - possiblySleep(hectorToHaloCtr); - } - } - }; - Receiver xenaReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - xenaToHaloCtr++; - hectorSender.send(payload); - if (xenaToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - possiblySleep(xenaToHaloCtr); - } - }; - Receiver troyReceiver = new Receiver() { - @Override - public void receive(String s) throws Exception { - troyToHaloCtr++; - xenaSender.send(payload); - if (troyToHaloCtr >= counter) { - synchronized (lock) { - lock.notifyAll(); - } - } - } - }; - buildReceiver(haloConnection, hectorToHalo, true, hectorReceiver, false); - buildReceiver(haloConnection, xenaToHalo, true, xenaReceiver, false); - buildReceiver(haloConnection, troyToHalo, true, troyReceiver, false); - - haloConnection.start(); - - troyConnection.start(); - troyThread.start(); - - xenaConnection.start(); - xenaThread.start(); - - hectorConnection.start(); - hectorThread.start(); - waitForMessagesToBeDelivered(); - // number of messages received should match messages sent - assertEquals(hectorToHaloCtr, counter); - LOG.info("hectorToHalo received " + hectorToHaloCtr + " messages"); - assertEquals(xenaToHaloCtr, counter); - LOG.info("xenaToHalo received " + xenaToHaloCtr + " messages"); - assertEquals(troyToHaloCtr, counter); - LOG.info("troyToHalo received " + troyToHaloCtr + " messages"); - assertEquals(haloToHectorCtr, counter); - LOG.info("haloToHector received " + haloToHectorCtr + " messages"); - assertEquals(haloToXenaCtr, counter); - LOG.info("haloToXena received " + haloToXenaCtr + " messages"); - assertEquals(haloToTroyCtr, counter); - LOG.info("haloToTroy received " + haloToTroyCtr + " messages"); - - } - - protected void possiblySleep(int count) throws InterruptedException { - if (useSleep) { - if (count % 100 == 0) { - Thread.sleep(5000); - } - } - - } - - protected void waitForMessagesToBeDelivered() { - // let's give the listeners enough time to read all messages - long maxWaitTime = counter * 1000; - long waitTime = maxWaitTime; - long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); - - synchronized (lock) { - boolean hasMessages = true; - while (hasMessages && waitTime >= 0) { - try { - lock.wait(200); - } - catch (InterruptedException e) { - LOG.error(e.toString()); - } - // check if all messages have been received - hasMessages = hectorToHaloCtr < counter || xenaToHaloCtr < counter || troyToHaloCtr < counter || haloToHectorCtr < counter || haloToXenaCtr < counter || haloToTroyCtr < counter; - waitTime = maxWaitTime - (System.currentTimeMillis() - start); - } - } - } - - public MessageSender buildTransactionalProducer(String queueName, - Connection connection, - boolean isTopic) throws Exception { - - return new MessageSender(queueName, connection, true, isTopic); - } - - public Thread buildProducer(Connection connection, final String queueName) throws Exception { - return buildProducer(connection, queueName, false, false); - } - - public Thread buildProducer(Connection connection, - final String queueName, - boolean transacted, - boolean isTopic) throws Exception { - final MessageSender producer = new MessageSender(queueName, connection, transacted, isTopic); - Thread thread = new Thread() { - @Override - public synchronized void run() { - for (int i = 0; i < counter; i++) { - try { - producer.send(payload); - } - catch (Exception e) { - throw new RuntimeException("on " + queueName + " send", e); - } - } - } - }; - return thread; - } - - public void buildReceiver(Connection connection, - final String queueName, - boolean transacted, - final Receiver receiver, - boolean isTopic) throws Exception { - final Session session = transacted ? connection.createSession(true, Session.SESSION_TRANSACTED) : connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer inputMessageConsumer = session.createConsumer(isTopic ? session.createTopic(queueName) : session.createQueue(queueName)); - MessageListener messageListener = new MessageListener() { - - @Override - public void onMessage(Message message) { - try { - ObjectMessage objectMessage = (ObjectMessage) message; - String s = (String) objectMessage.getObject(); - receiver.receive(s); - if (session.getTransacted()) { - session.commit(); - } - - } - catch (Exception e) { - e.printStackTrace(); - } - } - }; - inputMessageConsumer.setMessageListener(messageListener); - } - -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java deleted file mode 100644 index 4bc92ad..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OptimizeAcknowledgeWithExpiredMsgsTest.java +++ /dev/null @@ -1,309 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.atomic.AtomicInteger; - -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.Destination; -import javax.jms.ExceptionListener; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test for AMQ-3965. - * A consumer may be stalled in case it uses optimizeAcknowledge and receives - * a number of messages that expire before being dispatched to application code. - * See for more details. - */ -public class OptimizeAcknowledgeWithExpiredMsgsTest { - - private final static Logger LOG = LoggerFactory.getLogger(OptimizeAcknowledgeWithExpiredMsgsTest.class); - - private BrokerService broker = null; - - private String connectionUri; - - /** - * Creates a broker instance but does not start it. - * - * @param brokerUri - transport uri of broker - * @param brokerName - name for the broker - * @return a BrokerService instance with transport uri and broker name set - * @throws Exception - */ - protected BrokerService createBroker() throws Exception { - BrokerService broker = new BrokerService(); - broker.setPersistent(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.setUseJmx(false); - connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); - return broker; - } - - @Before - public void setUp() throws Exception { - broker = createBroker(); - broker.start(); - broker.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - broker = null; - } - } - - /** - * Tests for AMQ-3965 - * Creates connection into broker using optimzeAcknowledge and prefetch=100 - * Creates producer and consumer. Producer sends 45 msgs that will expire - * at consumer (but before being dispatched to app code). - * Producer then sends 60 msgs without expiry. - * - * Consumer receives msgs using a MessageListener and increments a counter. - * Main thread sleeps for 5 seconds and checks the counter value. - * If counter != 60 msgs (the number of msgs that should get dispatched - * to consumer) the test fails. - */ - @Test - public void testOptimizedAckWithExpiredMsgs() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); - - // Create JMS resources - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("TEST.FOO"); - - // ***** Consumer code ***** - MessageConsumer consumer = session.createConsumer(destination); - - final MyMessageListener listener = new MyMessageListener(); - connection.setExceptionListener(listener); - - // ***** Producer Code ***** - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); - TextMessage message; - - // Produce msgs that will expire quickly - for (int i = 0; i < 45; i++) { - message = session.createTextMessage(text); - producer.send(message, 1, 1, 100); - LOG.trace("Sent message: " + message.getJMSMessageID() + - " with expiry 10 msec"); - } - // Produce msgs that don't expire - for (int i = 0; i < 60; i++) { - message = session.createTextMessage(text); - producer.send(message, 1, 1, 60000); - // producer.send(message); - LOG.trace("Sent message: " + message.getJMSMessageID() + - " with expiry 30 sec"); - } - consumer.setMessageListener(listener); - - sleep(1000); // let the batch of 45 expire. - - connection.start(); - - assertTrue("Should receive all expected messages, counter at " + listener.getCounter(), Wait.waitFor(new Wait.Condition() { - - @Override - public boolean isSatisified() throws Exception { - return listener.getCounter() == 60; - } - })); - - LOG.info("Received all expected messages with counter at: " + listener.getCounter()); - - // Cleanup - producer.close(); - consumer.close(); - session.close(); - connection.close(); - } - - @Test - public void testOptimizedAckWithExpiredMsgsSync() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); - - // Create JMS resources - Connection connection = connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("TEST.FOO"); - - // ***** Consumer code ***** - MessageConsumer consumer = session.createConsumer(destination); - - // ***** Producer Code ***** - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); - TextMessage message; - - // Produce msgs that will expire quickly - for (int i = 0; i < 45; i++) { - message = session.createTextMessage(text); - producer.send(message, 1, 1, 10); - LOG.trace("Sent message: " + message.getJMSMessageID() + - " with expiry 10 msec"); - } - // Produce msgs that don't expire - for (int i = 0; i < 60; i++) { - message = session.createTextMessage(text); - producer.send(message, 1, 1, 30000); - // producer.send(message); - LOG.trace("Sent message: " + message.getJMSMessageID() + - " with expiry 30 sec"); - } - sleep(200); - - int counter = 1; - for (; counter <= 60; ++counter) { - assertNotNull(consumer.receive(2000)); - LOG.info("counter at " + counter); - } - LOG.info("Received all expected messages with counter at: " + counter); - - // Cleanup - producer.close(); - consumer.close(); - session.close(); - connection.close(); - } - - @Test - public void testOptimizedAckWithExpiredMsgsSync2() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + "?jms.optimizeAcknowledge=true&jms.prefetchPolicy.all=100"); - - // Create JMS resources - Connection connection = connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue("TEST.FOO"); - - // ***** Consumer code ***** - MessageConsumer consumer = session.createConsumer(destination); - - // ***** Producer Code ***** - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - - String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode(); - TextMessage message; - - // Produce msgs that don't expire - for (int i = 0; i < 56; i++) { - message = session.createTextMessage(text); - producer.send(message, 1, 1, 30000); - // producer.send(message); - LOG.trace("Sent message: " + message.getJMSMessageID() + - " with expiry 30 sec"); - } - // Produce msgs that will expire quickly - for (int i = 0; i < 44; i++) { - message = session.createTextMessage(text); - producer.send(message, 1, 1, 10); - LOG.trace("Sent message: " + message.getJMSMessageID() + - " with expiry 10 msec"); - } - // Produce some moremsgs that don't expire - for (int i = 0; i < 4; i++) { - message = session.createTextMessage(text); - producer.send(message, 1, 1, 30000); - // producer.send(message); - LOG.trace("Sent message: " + message.getJMSMessageID() + - " with expiry 30 sec"); - } - - sleep(200); - - int counter = 1; - for (; counter <= 60; ++counter) { - assertNotNull(consumer.receive(2000)); - LOG.info("counter at " + counter); - } - LOG.info("Received all expected messages with counter at: " + counter); - - // Cleanup - producer.close(); - consumer.close(); - session.close(); - connection.close(); - } - - private void sleep(int milliSecondTime) { - try { - Thread.sleep(milliSecondTime); - } - catch (InterruptedException igonred) { - } - } - - /** - * Standard JMS MessageListener - */ - private class MyMessageListener implements MessageListener, ExceptionListener { - - private AtomicInteger counter = new AtomicInteger(0); - - @Override - public void onMessage(final Message message) { - try { - LOG.trace("Got Message " + message.getJMSMessageID()); - LOG.info("counter at " + counter.incrementAndGet()); - } - catch (final Exception e) { - } - } - - public int getCounter() { - return counter.get(); - } - - @Override - public synchronized void onException(JMSException ex) { - LOG.error("JMS Exception occurred. Shutting down client."); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java deleted file mode 100644 index 2b84862..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/OutOfOrderTestCase.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class OutOfOrderTestCase extends TestCase { - - private static final Logger log = LoggerFactory.getLogger(OutOfOrderTestCase.class); - - private static final String BROKER_URL = "tcp://localhost:0"; - private static final int PREFETCH = 10; - private static final String CONNECTION_URL_OPTIONS = "?jms.prefetchPolicy.all=" + PREFETCH; - - private static final String DESTINATION = "QUEUE?consumer.exclusive=true"; - - private BrokerService brokerService; - private Session session; - private Connection connection; - private String connectionUri; - - private int seq = 0; - - @Override - public void setUp() throws Exception { - brokerService = new BrokerService(); - brokerService.setUseJmx(true); - brokerService.addConnector(BROKER_URL); - brokerService.deleteAllMessages(); - brokerService.start(); - brokerService.waitUntilStarted(); - - connectionUri = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); - - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri + CONNECTION_URL_OPTIONS); - connection = connectionFactory.createConnection(); - connection.start(); - session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - } - - @Override - protected void tearDown() throws Exception { - session.close(); - connection.close(); - brokerService.stop(); - } - - public void testOrder() throws Exception { - - log.info("Producing messages 0-29 . . ."); - Destination destination = session.createQueue(DESTINATION); - final MessageProducer messageProducer = session.createProducer(destination); - try { - for (int i = 0; i < 30; ++i) { - final Message message = session.createTextMessage(createMessageText(i)); - message.setStringProperty("JMSXGroupID", "FOO"); - - messageProducer.send(message); - log.info("sent " + toString(message)); - } - } - finally { - messageProducer.close(); - } - - log.info("Consuming messages 0-9 . . ."); - consumeBatch(); - - log.info("Consuming messages 10-19 . . ."); - consumeBatch(); - - log.info("Consuming messages 20-29 . . ."); - consumeBatch(); - } - - protected void consumeBatch() throws Exception { - Destination destination = session.createQueue(DESTINATION); - final MessageConsumer messageConsumer = session.createConsumer(destination); - try { - for (int i = 0; i < 10; ++i) { - final Message message = messageConsumer.receive(1000L); - log.info("received " + toString(message)); - assertEquals("Message out of order", createMessageText(seq++), ((TextMessage) message).getText()); - message.acknowledge(); - } - } - finally { - messageConsumer.close(); - } - } - - private String toString(final Message message) throws JMSException { - String ret = "received message '" + ((TextMessage) message).getText() + "' - " + message.getJMSMessageID(); - if (message.getJMSRedelivered()) - ret += " (redelivered)"; - return ret; - - } - - private static String createMessageText(final int index) { - return "message #" + index; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java deleted file mode 100644 index 95057b9..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/QueueWorkerPrefetchTest.java +++ /dev/null @@ -1,267 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.io.Serializable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; -import org.apache.activemq.broker.BrokerService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test case demonstrating situation where messages are not delivered to - * consumers. - */ -public class QueueWorkerPrefetchTest extends TestCase implements MessageListener { - - private static final Logger LOG = LoggerFactory.getLogger(QueueWorkerPrefetchTest.class); - private static final int BATCH_SIZE = 10; - private static final long WAIT_TIMEOUT = 1000 * 10; - - /** - * The connection URL. - */ - private static final String BROKER_BIND_ADDRESS = "tcp://localhost:0"; - - /** - * The queue prefetch size to use. A value greater than 1 seems to make - * things work. - */ - private static final int QUEUE_PREFETCH_SIZE = 1; - - /** - * The number of workers to use. A single worker with a prefetch of 1 works. - */ - private static final int NUM_WORKERS = 2; - - /** - * Embedded JMS broker. - */ - private BrokerService broker; - - /** - * The master's producer object for creating work items. - */ - private MessageProducer workItemProducer; - - /** - * The master's consumer object for consuming ack messages from workers. - */ - private MessageConsumer masterItemConsumer; - - /** - * The number of acks received by the master. - */ - private final AtomicLong acksReceived = new AtomicLong(0); - - private final AtomicReference<CountDownLatch> latch = new AtomicReference<>(); - - private String connectionUri; - - /** - * Messages sent to the work-item queue. - */ - private static class WorkMessage implements Serializable { - - private static final long serialVersionUID = 1L; - private final int id; - - public WorkMessage(int id) { - this.id = id; - } - - @Override - public String toString() { - return "Work: " + id; - } - } - - /** - * The worker process. Consume messages from the work-item queue, possibly - * creating more messages to submit to the work-item queue. For each work - * item, send an ack to the master. - */ - private static class Worker implements MessageListener { - - /** - * Counter shared between workers to decided when new work-item messages - * are created. - */ - private static AtomicInteger counter = new AtomicInteger(0); - - /** - * Session to use. - */ - private Session session; - - /** - * Producer for sending ack messages to the master. - */ - private MessageProducer masterItemProducer; - - /** - * Producer for sending new work items to the work-items queue. - */ - private MessageProducer workItemProducer; - - public Worker(Session session) throws JMSException { - this.session = session; - masterItemProducer = session.createProducer(session.createQueue("master-item")); - Queue workItemQueue = session.createQueue("work-item"); - workItemProducer = session.createProducer(workItemQueue); - MessageConsumer workItemConsumer = session.createConsumer(workItemQueue); - workItemConsumer.setMessageListener(this); - } - - @Override - public void onMessage(javax.jms.Message message) { - try { - WorkMessage work = (WorkMessage) ((ObjectMessage) message).getObject(); - - long c = counter.incrementAndGet(); - - // Don't create a new work item for every BATCH_SIZE message. */ - if (c % BATCH_SIZE != 0) { - // Send new work item to work-item queue. - workItemProducer.send(session.createObjectMessage(new WorkMessage(work.id + 1))); - } - - // Send ack to master. - masterItemProducer.send(session.createObjectMessage(work)); - } - catch (JMSException e) { - throw new IllegalStateException("Something has gone wrong", e); - } - } - - /** - * Close of JMS resources used by worker. - */ - public void close() throws JMSException { - masterItemProducer.close(); - workItemProducer.close(); - session.close(); - } - } - - /** - * Master message handler. Process ack messages. - */ - @Override - public void onMessage(javax.jms.Message message) { - long acks = acksReceived.incrementAndGet(); - latch.get().countDown(); - if (acks % 1 == 0) { - LOG.info("Master now has ack count of: " + acksReceived); - } - } - - @Override - protected void setUp() throws Exception { - // Create the message broker. - super.setUp(); - broker = new BrokerService(); - broker.setPersistent(false); - broker.setUseJmx(true); - broker.addConnector(BROKER_BIND_ADDRESS); - broker.start(); - broker.waitUntilStarted(); - - connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); - } - - @Override - protected void tearDown() throws Exception { - // Shut down the message broker. - broker.deleteAllMessages(); - broker.stop(); - super.tearDown(); - } - - public void testActiveMQ() throws Exception { - // Create the connection to the broker. - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); - ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); - prefetchPolicy.setQueuePrefetch(QUEUE_PREFETCH_SIZE); - connectionFactory.setPrefetchPolicy(prefetchPolicy); - Connection connection = connectionFactory.createConnection(); - connection.start(); - - Session masterSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - workItemProducer = masterSession.createProducer(masterSession.createQueue("work-item")); - masterItemConsumer = masterSession.createConsumer(masterSession.createQueue("master-item")); - masterItemConsumer.setMessageListener(this); - - // Create the workers. - Worker[] workers = new Worker[NUM_WORKERS]; - for (int i = 0; i < NUM_WORKERS; i++) { - workers[i] = new Worker(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); - } - - // Send a message to the work queue, and wait for the BATCH_SIZE acks - // from the workers. - acksReceived.set(0); - latch.set(new CountDownLatch(BATCH_SIZE)); - workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1))); - - if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { - fail("First batch only received " + acksReceived + " messages"); - } - - LOG.info("First batch received"); - - // Send another message to the work queue, and wait for the next 1000 acks. It is - // at this point where the workers never get notified of this message, as they - // have a large pending queue. Creating a new worker at this point however will - // receive this new message. - acksReceived.set(0); - latch.set(new CountDownLatch(BATCH_SIZE)); - workItemProducer.send(masterSession.createObjectMessage(new WorkMessage(1))); - - if (!latch.get().await(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) { - fail("Second batch only received " + acksReceived + " messages"); - } - - LOG.info("Second batch received"); - - // Cleanup all JMS resources. - for (int i = 0; i < NUM_WORKERS; i++) { - workers[i].close(); - } - masterSession.close(); - connection.close(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java deleted file mode 100644 index 549922d..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackSharedConsumerTests.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class RawRollbackSharedConsumerTests { - - private static ConnectionFactory connectionFactory; - private static Destination queue; - private static BrokerService broker; - - @BeforeClass - public static void clean() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setUseJmx(true); - broker.start(); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); - connectionFactory.setBrokerURL("vm://localhost?async=false"); - RawRollbackSharedConsumerTests.connectionFactory = connectionFactory; - queue = new ActiveMQQueue("queue"); - } - - @AfterClass - public static void close() throws Exception { - broker.stop(); - } - - @Before - public void clearData() throws Exception { - getMessages(false); // drain queue - convertAndSend("foo"); - convertAndSend("bar"); - } - - @After - public void checkPostConditions() throws Exception { - - Thread.sleep(1000L); - List<String> list = getMessages(false); - assertEquals(2, list.size()); - - } - - @Test - public void testReceiveMessages() throws Exception { - - List<String> list = getMessages(true); - assertEquals(2, list.size()); - assertTrue(list.contains("foo")); - - } - - private void convertAndSend(String msg) throws Exception { - Connection connection = connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(queue); - producer.send(session.createTextMessage(msg)); - producer.close(); - session.commit(); - session.close(); - connection.close(); - } - - private List<String> getMessages(boolean rollback) throws Exception { - Connection connection = connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - String next = ""; - List<String> msgs = new ArrayList<>(); - MessageConsumer consumer = session.createConsumer(queue); - while (next != null) { - next = receiveAndConvert(consumer); - if (next != null) - msgs.add(next); - } - consumer.close(); - if (rollback) { - session.rollback(); - } - else { - session.commit(); - } - session.close(); - connection.close(); - return msgs; - } - - private String receiveAndConvert(MessageConsumer consumer) throws Exception { - Message message = consumer.receive(100L); - if (message == null) { - return null; - } - return ((TextMessage) message).getText(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java deleted file mode 100644 index 74437b7..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/RawRollbackTests.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.ArrayList; -import java.util.List; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -public class RawRollbackTests { - - private static ConnectionFactory connectionFactory; - private static Destination queue; - private static BrokerService broker; - - @BeforeClass - public static void clean() throws Exception { - broker = new BrokerService(); - broker.setDeleteAllMessagesOnStartup(true); - broker.setUseJmx(true); - broker.start(); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); - connectionFactory.setBrokerURL("vm://localhost?async=false&waitForStart=5000&jms.prefetchPolicy.all=0"); - RawRollbackTests.connectionFactory = connectionFactory; - queue = new ActiveMQQueue("queue"); - } - - @AfterClass - public static void close() throws Exception { - broker.stop(); - } - - @Before - public void clearData() throws Exception { - getMessages(false); // drain queue - convertAndSend("foo"); - convertAndSend("bar"); - } - - @After - public void checkPostConditions() throws Exception { - - Thread.sleep(1000L); - List<String> list = getMessages(false); - assertEquals(2, list.size()); - - } - - @Test - public void testReceiveMessages() throws Exception { - - List<String> list = getMessages(true); - assertEquals(2, list.size()); - assertTrue(list.contains("foo")); - - } - - private void convertAndSend(String msg) throws Exception { - Connection connection = connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(queue); - producer.send(session.createTextMessage(msg)); - producer.close(); - session.commit(); - session.close(); - connection.close(); - } - - private List<String> getMessages(boolean rollback) throws Exception { - Connection connection = connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - String next = ""; - List<String> msgs = new ArrayList<>(); - while (next != null) { - next = receiveAndConvert(session); - if (next != null) - msgs.add(next); - } - if (rollback) { - session.rollback(); - } - else { - session.commit(); - } - session.close(); - connection.close(); - return msgs; - } - - private String receiveAndConvert(Session session) throws Exception { - MessageConsumer consumer = session.createConsumer(queue); - Message message = consumer.receive(100L); - consumer.close(); - if (message == null) { - return null; - } - return ((TextMessage) message).getText(); - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java deleted file mode 100644 index e6d1d40..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/Receiver.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -public interface Receiver { - - void receive(String s) throws Exception; -}