http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java deleted file mode 100644 index 657d7a2..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4887Test.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.StreamMessage; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4887Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4887Test.class); - private static final Integer ITERATIONS = 10; - - @Rule - public TestName name = new TestName(); - - @Test - public void testBytesMessageSetPropertyBeforeCopy() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - doTestBytesMessageSetPropertyBeforeCopy(connection); - } - - @Test - public void testBytesMessageSetPropertyBeforeCopyCompressed() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); - connectionFactory.setUseCompression(true); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - doTestBytesMessageSetPropertyBeforeCopy(connection); - } - - public void doTestBytesMessageSetPropertyBeforeCopy(Connection connection) throws Exception { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(name.toString()); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - BytesMessage message = session.createBytesMessage(); - - for (int i = 0; i < ITERATIONS; i++) { - - long sendTime = System.currentTimeMillis(); - message.setLongProperty("sendTime", sendTime); - producer.send(message); - - LOG.debug("Receiving message " + i); - Message receivedMessage = consumer.receive(5000); - assertNotNull("On message " + i, receivedMessage); - assertTrue("On message " + i, receivedMessage instanceof BytesMessage); - - BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage; - - int numElements = 0; - try { - while (true) { - receivedBytesMessage.readBoolean(); - numElements++; - } - } - catch (Exception ex) { - } - - LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements); - assertEquals(i, numElements); - - long receivedSendTime = receivedBytesMessage.getLongProperty("sendTime"); - assertEquals("On message " + i, receivedSendTime, sendTime); - - // Add a new bool value on each iteration. - message.writeBoolean(true); - } - } - - @Test - public void testStreamMessageSetPropertyBeforeCopy() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - doTestStreamMessageSetPropertyBeforeCopy(connection); - } - - @Test - public void testStreamMessageSetPropertyBeforeCopyCompressed() throws Exception { - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); - connectionFactory.setUseCompression(true); - ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); - connection.start(); - doTestStreamMessageSetPropertyBeforeCopy(connection); - } - - public void doTestStreamMessageSetPropertyBeforeCopy(Connection connection) throws Exception { - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination destination = session.createQueue(name.toString()); - MessageConsumer consumer = session.createConsumer(destination); - MessageProducer producer = session.createProducer(destination); - - StreamMessage message = session.createStreamMessage(); - - for (int i = 0; i < ITERATIONS; i++) { - - long sendTime = System.currentTimeMillis(); - message.setLongProperty("sendTime", sendTime); - producer.send(message); - - LOG.debug("Receiving message " + i); - Message receivedMessage = consumer.receive(5000); - assertNotNull("On message " + i, receivedMessage); - assertTrue("On message " + i, receivedMessage instanceof StreamMessage); - - StreamMessage receivedStreamMessage = (StreamMessage) receivedMessage; - - int numElements = 0; - try { - while (true) { - receivedStreamMessage.readBoolean(); - numElements++; - } - } - catch (Exception ex) { - } - - LOG.info("Iteration [{}]: Received Message contained {} boolean values.", i, numElements); - assertEquals(i, numElements); - - long receivedSendTime = receivedStreamMessage.getLongProperty("sendTime"); - assertEquals("On message " + i, receivedSendTime, sendTime); - - // Add a new bool value on each iteration. - message.writeBoolean(true); - } - } - -}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java deleted file mode 100644 index ba65ab7..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4893Test.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.io.IOException; -import java.util.Map; - -import javax.jms.JMSException; - -import org.apache.activemq.command.ActiveMQObjectMessage; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.util.ByteSequence; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4893Test { - - private static final transient Logger LOG = LoggerFactory.getLogger(AMQ4893Test.class); - - @Test - public void testPropertiesInt() throws Exception { - ActiveMQObjectMessage message = new ActiveMQObjectMessage(); - message.setIntProperty("TestProp", 333); - fakeUnmarshal(message); - roundTripProperties(message); - } - - @Test - public void testPropertiesString() throws Exception { - ActiveMQObjectMessage message = new ActiveMQObjectMessage(); - message.setStringProperty("TestProp", "Value"); - fakeUnmarshal(message); - roundTripProperties(message); - } - - @Test - public void testPropertiesObject() throws Exception { - ActiveMQObjectMessage message = new ActiveMQObjectMessage(); - message.setObjectProperty("TestProp", "Value"); - fakeUnmarshal(message); - roundTripProperties(message); - } - - @Test - public void testPropertiesObjectNoMarshalling() throws Exception { - ActiveMQObjectMessage message = new ActiveMQObjectMessage(); - message.setObjectProperty("TestProp", "Value"); - roundTripProperties(message); - } - - private void roundTripProperties(ActiveMQObjectMessage message) throws IOException, JMSException { - ActiveMQObjectMessage copy = new ActiveMQObjectMessage(); - for (Map.Entry<String, Object> prop : message.getProperties().entrySet()) { - LOG.debug("{} -> {}", prop.getKey(), prop.getValue().getClass()); - copy.setObjectProperty(prop.getKey(), prop.getValue()); - } - } - - private void fakeUnmarshal(ActiveMQObjectMessage message) throws IOException { - // we need to force the unmarshalled property field to be set so it - // gives us a hawtbuffer for the string - OpenWireFormat format = new OpenWireFormat(); - message.beforeMarshall(format); - message.afterMarshall(format); - - ByteSequence seq = message.getMarshalledProperties(); - message.clearProperties(); - message.setMarshalledProperties(seq); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java deleted file mode 100644 index fe336eb..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4899Test.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.region.DestinationInterceptor; -import org.apache.activemq.broker.region.virtual.VirtualDestination; -import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; -import org.apache.activemq.broker.region.virtual.VirtualTopic; -import org.apache.activemq.plugin.SubQueueSelectorCacheBrokerPlugin; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.jms.Connection; -import javax.jms.Destination; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; -import javax.jms.TextMessage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; - -public class AMQ4899Test { - - protected static final Logger LOG = LoggerFactory.getLogger(AMQ4899Test.class); - private static final String QUEUE_NAME = "AMQ4899TestQueue"; - private static final String CONSUMER_QUEUE = "Consumer.Orders.VirtualOrders." + QUEUE_NAME; - private static final String PRODUCER_DESTINATION_NAME = "VirtualOrders." + QUEUE_NAME; - - private static final Integer MESSAGE_LIMIT = 20; - public static final String CONSUMER_A_SELECTOR = "Order < " + 10; - public static String CONSUMER_B_SELECTOR = "Order >= " + 10; - private CountDownLatch consumersStarted = new CountDownLatch(2); - private CountDownLatch consumerAtoConsumeCount = new CountDownLatch(10); - private CountDownLatch consumerBtoConsumeCount = new CountDownLatch(10); - - private BrokerService broker; - - @Before - public void setUp() { - setupBroker("broker://()/localhost?"); - } - - @After - public void tearDown() throws Exception { - if (broker != null) { - broker.stop(); - broker.waitUntilStopped(); - } - } - - @Test(timeout = 60 * 1000) - public void testVirtualTopicMultipleSelectors() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - Queue consumerQueue = session.createQueue(CONSUMER_QUEUE); - - MessageListener listenerA = new AMQ4899Listener("A", consumersStarted, consumerAtoConsumeCount); - MessageConsumer consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR); - consumerA.setMessageListener(listenerA); - - MessageListener listenerB = new AMQ4899Listener("B", consumersStarted, consumerBtoConsumeCount); - MessageConsumer consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR); - consumerB.setMessageListener(listenerB); - - consumersStarted.await(10, TimeUnit.SECONDS); - assertEquals("Not all consumers started in time", 0, consumersStarted.getCount()); - - Destination producerDestination = session.createTopic(PRODUCER_DESTINATION_NAME); - MessageProducer producer = session.createProducer(producerDestination); - int messageIndex = 0; - for (int i = 0; i < MESSAGE_LIMIT; i++) { - if (i == 3) { - LOG.debug("Stopping consumerA"); - consumerA.close(); - } - - if (i == 14) { - LOG.debug("Stopping consumer B"); - consumerB.close(); - } - String messageText = "hello " + messageIndex++ + " sent at " + new java.util.Date().toString(); - TextMessage message = session.createTextMessage(messageText); - message.setIntProperty("Order", i); - LOG.debug("Sending message [{}]", messageText); - producer.send(message); - Thread.sleep(100); - } - Thread.sleep(1 * 1000); - - // restart consumerA - LOG.debug("Restarting consumerA"); - consumerA = session.createConsumer(consumerQueue, CONSUMER_A_SELECTOR); - consumerA.setMessageListener(listenerA); - - // restart consumerB - LOG.debug("restarting consumerB"); - consumerB = session.createConsumer(consumerQueue, CONSUMER_B_SELECTOR); - consumerB.setMessageListener(listenerB); - - consumerAtoConsumeCount.await(5, TimeUnit.SECONDS); - consumerBtoConsumeCount.await(5, TimeUnit.SECONDS); - - LOG.debug("Unconsumed messages for consumerA {} consumerB {}", consumerAtoConsumeCount.getCount(), consumerBtoConsumeCount.getCount()); - - assertEquals("Consumer A did not consume all messages", 0, consumerAtoConsumeCount.getCount()); - assertEquals("Consumer B did not consume all messages", 0, consumerBtoConsumeCount.getCount()); - - connection.close(); - } - - /** - * Setup broker with VirtualTopic configured - */ - private void setupBroker(String uri) { - try { - broker = BrokerFactory.createBroker(uri); - - VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); - VirtualTopic virtualTopic = new VirtualTopic(); - virtualTopic.setName("VirtualOrders.>"); - virtualTopic.setSelectorAware(true); - VirtualDestination[] virtualDestinations = {virtualTopic}; - interceptor.setVirtualDestinations(virtualDestinations); - broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); - - SubQueueSelectorCacheBrokerPlugin subQueueSelectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin(); - BrokerPlugin[] updatedPlugins = {subQueueSelectorCacheBrokerPlugin}; - broker.setPlugins(updatedPlugins); - - broker.start(); - broker.waitUntilStarted(); - } - catch (Exception e) { - LOG.error("Failed creating broker", e); - } - } -} - -class AMQ4899Listener implements MessageListener { - - Logger LOG = LoggerFactory.getLogger(AMQ4899Listener.class); - CountDownLatch toConsume; - String id; - - public AMQ4899Listener(String id, CountDownLatch started, CountDownLatch toConsume) { - this.id = id; - this.toConsume = toConsume; - started.countDown(); - } - - @Override - public void onMessage(Message message) { - toConsume.countDown(); - try { - if (message instanceof TextMessage) { - TextMessage textMessage = (TextMessage) message; - LOG.debug("Listener {} received [{}]", id, textMessage.getText()); - } - else { - LOG.error("Listener {} Expected a TextMessage, got {}", id, message.getClass().getCanonicalName()); - } - } - catch (JMSException e) { - LOG.error("Unexpected JMSException in Listener " + id, e); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java deleted file mode 100644 index 4805873..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4930Test.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.DeliveryMode; -import javax.jms.MessageProducer; -import javax.jms.Session; - -import junit.framework.TestCase; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.QueueViewMBean; -import org.apache.activemq.broker.region.Queue; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.Message; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class AMQ4930Test extends TestCase { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4930Test.class); - final int messageCount = 150; - final int messageSize = 1024 * 1024; - final int maxBrowsePageSize = 50; - final ActiveMQQueue bigQueue = new ActiveMQQueue("BIG"); - BrokerService broker; - ActiveMQConnectionFactory factory; - - protected void configureBroker() throws Exception { - broker.setDeleteAllMessagesOnStartup(true); - broker.setAdvisorySupport(false); - broker.getSystemUsage().getMemoryUsage().setLimit(1 * 1024 * 1024); - - PolicyMap pMap = new PolicyMap(); - PolicyEntry policy = new PolicyEntry(); - // disable expriy processing as this will call browse in parallel - policy.setExpireMessagesPeriod(0); - policy.setMaxPageSize(maxBrowsePageSize); - policy.setMaxBrowsePageSize(maxBrowsePageSize); - pMap.setDefaultEntry(policy); - - broker.setDestinationPolicy(pMap); - } - - public void testBrowsePendingNonPersistent() throws Exception { - doTestBrowsePending(DeliveryMode.NON_PERSISTENT); - } - - public void testBrowsePendingPersistent() throws Exception { - doTestBrowsePending(DeliveryMode.PERSISTENT); - } - - public void testWithStatsDisabled() throws Exception { - ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().setEnabled(false); - doTestBrowsePending(DeliveryMode.PERSISTENT); - } - - public void doTestBrowsePending(int deliveryMode) throws Exception { - - Connection connection = factory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(bigQueue); - producer.setDeliveryMode(deliveryMode); - BytesMessage bytesMessage = session.createBytesMessage(); - bytesMessage.writeBytes(new byte[messageSize]); - - for (int i = 0; i < messageCount; i++) { - producer.send(bigQueue, bytesMessage); - } - - final QueueViewMBean queueViewMBean = (QueueViewMBean) broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0], QueueViewMBean.class, false); - - LOG.info(queueViewMBean.getName() + " Size: " + queueViewMBean.getEnqueueCount()); - - connection.close(); - - assertFalse("Cache disabled on q", queueViewMBean.isCacheEnabled()); - - // ensure repeated browse does now blow mem - - final Queue underTest = (Queue) ((RegionBroker) broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(bigQueue); - - // do twice to attempt to pull in 2*maxBrowsePageSize which uses up the system memory limit - Message[] browsed = underTest.browse(); - LOG.info("Browsed: " + browsed.length); - assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); - browsed = underTest.browse(); - LOG.info("Browsed: " + browsed.length); - assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); - Runtime.getRuntime().gc(); - long free = Runtime.getRuntime().freeMemory() / 1024; - LOG.info("free at start of check: " + free); - // check for memory growth - for (int i = 0; i < 10; i++) { - LOG.info("free: " + Runtime.getRuntime().freeMemory() / 1024); - browsed = underTest.browse(); - LOG.info("Browsed: " + browsed.length); - assertEquals("maxBrowsePageSize", maxBrowsePageSize, browsed.length); - Runtime.getRuntime().gc(); - Runtime.getRuntime().gc(); - assertTrue("No growth: " + Runtime.getRuntime().freeMemory() / 1024 + " >= " + (free - (free * 0.2)), Runtime.getRuntime().freeMemory() / 1024 >= (free - (free * 0.2))); - } - } - - @Override - protected void setUp() throws Exception { - super.setUp(); - broker = new BrokerService(); - broker.setBrokerName("thisOne"); - configureBroker(); - broker.start(); - factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true"); - factory.setWatchTopicAdvisories(false); - - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - if (broker != null) { - broker.stop(); - broker = null; - } - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java deleted file mode 100644 index 74d0817..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4950Test.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.concurrent.CopyOnWriteArrayList; - -import javax.jms.Message; -import javax.jms.MessageProducer; -import javax.jms.XASession; -import javax.transaction.xa.XAException; -import javax.transaction.xa.XAResource; -import javax.transaction.xa.Xid; - -import org.apache.activemq.ActiveMQXAConnection; -import org.apache.activemq.ActiveMQXAConnectionFactory; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerRegistry; -import org.apache.activemq.broker.BrokerRestartTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.TransactionBroker; -import org.apache.activemq.broker.TransportConnection; -import org.apache.activemq.command.ConnectionId; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.command.TransactionInfo; -import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.transport.failover.FailoverTransport; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Test for AMQ-4950. - * Simulates an error during XA prepare call. - */ -public class AMQ4950Test extends BrokerRestartTestSupport { - - protected static final Logger LOG = LoggerFactory.getLogger(AMQ4950Test.class); - protected static final String simulatedExceptionMessage = "Simulating error inside tx prepare()."; - public boolean prioritySupport = false; - protected String connectionUri = null; - - @Override - protected void configureBroker(BrokerService broker) throws Exception { - broker.setDestinationPolicy(policyMap); - broker.setDeleteAllMessagesOnStartup(true); - broker.setUseJmx(false); - connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - - @Override - public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { - getNext().prepareTransaction(context, xid); - LOG.debug("BrokerPlugin.prepareTransaction() will throw an exception."); - throw new XAException(simulatedExceptionMessage); - } - - @Override - public void commitTransaction(ConnectionContext context, - TransactionId xid, - boolean onePhase) throws Exception { - LOG.debug("BrokerPlugin.commitTransaction()."); - super.commitTransaction(context, xid, onePhase); - } - }}); - } - - /** - * Creates XA transaction and invokes XA prepare(). - * Due to registered BrokerFilter prepare will be handled by broker - * but then throw an exception. - * Prior to fixing AMQ-4950, this resulted in a ClassCastException - * in ConnectionStateTracker.PrepareReadonlyTransactionAction.onResponse() - * causing the failover transport to reconnect and replay the XA prepare(). - */ - public void testXAPrepareFailure() throws Exception { - - assertNotNull(connectionUri); - ActiveMQXAConnectionFactory cf = new ActiveMQXAConnectionFactory("failover:(" + connectionUri + ")"); - ActiveMQXAConnection xaConnection = (ActiveMQXAConnection) cf.createConnection(); - xaConnection.start(); - XASession session = xaConnection.createXASession(); - XAResource resource = session.getXAResource(); - Xid tid = createXid(); - resource.start(tid, XAResource.TMNOFLAGS); - - MessageProducer producer = session.createProducer(session.createQueue(this.getClass().getName())); - Message message = session.createTextMessage("Sample Message"); - producer.send(message); - resource.end(tid, XAResource.TMSUCCESS); - try { - LOG.debug("Calling XA prepare(), expecting an exception"); - int ret = resource.prepare(tid); - if (XAResource.XA_OK == ret) - resource.commit(tid, false); - } - catch (XAException xae) { - LOG.info("Received excpected XAException: {}", xae.getMessage()); - LOG.info("Rolling back transaction {}", tid); - - // with bug AMQ-4950 the thrown error reads "Cannot call prepare now" - // we check that we receive the original exception message as - // thrown by the BrokerPlugin - assertEquals(simulatedExceptionMessage, xae.getMessage()); - resource.rollback(tid); - } - // couple of assertions - assertTransactionGoneFromBroker(tid); - assertTransactionGoneFromConnection(broker.getBrokerName(), xaConnection.getClientID(), xaConnection.getConnectionInfo().getConnectionId(), tid); - assertTransactionGoneFromFailoverState(xaConnection, tid); - - //cleanup - producer.close(); - session.close(); - xaConnection.close(); - LOG.debug("testXAPrepareFailure() finished."); - } - - public Xid createXid() throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream os = new DataOutputStream(baos); - os.writeLong(++txGenerator); - os.close(); - final byte[] bs = baos.toByteArray(); - - return new Xid() { - @Override - public int getFormatId() { - return 86; - } - - @Override - public byte[] getGlobalTransactionId() { - return bs; - } - - @Override - public byte[] getBranchQualifier() { - return bs; - } - }; - } - - private void assertTransactionGoneFromFailoverState(ActiveMQXAConnection connection1, Xid tid) throws Exception { - - FailoverTransport transport = connection1.getTransport().narrow(FailoverTransport.class); - TransactionInfo info = new TransactionInfo(connection1.getConnectionInfo().getConnectionId(), new XATransactionId(tid), TransactionInfo.COMMIT_ONE_PHASE); - assertNull("transaction should not exist in the state tracker", transport.getStateTracker().processCommitTransactionOnePhase(info)); - } - - private void assertTransactionGoneFromBroker(Xid tid) throws Exception { - BrokerService broker = BrokerRegistry.getInstance().lookup("localhost"); - TransactionBroker transactionBroker = (TransactionBroker) broker.getBroker().getAdaptor(TransactionBroker.class); - try { - transactionBroker.getTransaction(null, new XATransactionId(tid), false); - fail("expected exception on tx not found"); - } - catch (XAException expectedOnNotFound) { - } - } - - private void assertTransactionGoneFromConnection(String brokerName, - String clientId, - ConnectionId connectionId, - Xid tid) throws Exception { - BrokerService broker = BrokerRegistry.getInstance().lookup(brokerName); - CopyOnWriteArrayList<TransportConnection> connections = broker.getTransportConnectors().get(0).getConnections(); - for (TransportConnection connection : connections) { - if (connection.getConnectionId().equals(clientId)) { - try { - connection.processPrepareTransaction(new TransactionInfo(connectionId, new XATransactionId(tid), TransactionInfo.PREPARE)); - fail("did not get expected excepton on missing transaction, it must be still there in error!"); - } - catch (IllegalStateException expectedOnNoTransaction) { - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java deleted file mode 100644 index 0b74979..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java +++ /dev/null @@ -1,511 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.net.URI; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.sql.DataSource; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.Broker; -import org.apache.activemq.broker.BrokerFilter; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.TransportConnector; -import org.apache.activemq.broker.region.policy.PolicyEntry; -import org.apache.activemq.broker.region.policy.PolicyMap; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; -import org.apache.activemq.network.NetworkConnector; -import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; -import org.apache.activemq.util.IntrospectionSupport; -import org.apache.activemq.util.Wait; -import org.apache.derby.jdbc.EmbeddedDataSource; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.junit.Assert.*; - -/** - * Test creates a broker network with two brokers - producerBroker (with a - * message producer attached) and consumerBroker (with consumer attached) - * <br> - * Simulates network duplicate message by stopping and restarting the - * consumerBroker after message (with message ID ending in 120) is persisted to - * consumerBrokerstore BUT BEFORE ack sent to the producerBroker over the - * network connection. When the network connection is reestablished the - * producerBroker resends message (with messageID ending in 120). - * <br> - * Expectation: - * <br> - * With the following policy entries set, would expect the duplicate message to - * be read from the store and dispatched to the consumer - where the duplicate - * could be detected by consumer. - * <br> - * PolicyEntry policy = new PolicyEntry(); policy.setQueue(">"); - * policy.setEnableAudit(false); policy.setUseCache(false); - * policy.setExpireMessagesPeriod(0); - * <br> - * <br> - * Note 1: Network needs to use replaywhenNoConsumers so enabling the - * networkAudit to avoid this scenario is not feasible. - * <br> - * NOTE 2: Added a custom plugin to the consumerBroker so that the - * consumerBroker shutdown will occur after a message has been persisted to - * consumerBroker store but before an ACK is sent back to ProducerBroker. This - * is just a hack to ensure producerBroker will resend the message after - * shutdown. - */ - -@RunWith(value = Parameterized.class) -public class AMQ4952Test { - - private static final Logger LOG = LoggerFactory.getLogger(AMQ4952Test.class); - - protected static final int MESSAGE_COUNT = 1; - - protected BrokerService consumerBroker; - protected BrokerService producerBroker; - - protected ActiveMQQueue QUEUE_NAME = new ActiveMQQueue("duptest.store"); - - private final CountDownLatch stopConsumerBroker = new CountDownLatch(1); - private final CountDownLatch consumerBrokerRestarted = new CountDownLatch(1); - private final CountDownLatch consumerRestartedAndMessageForwarded = new CountDownLatch(1); - - private EmbeddedDataSource localDataSource; - - @Parameterized.Parameter(0) - public boolean enableCursorAudit; - - @Parameterized.Parameters(name = "enableAudit={0}") - public static Iterable<Object[]> getTestParameters() { - return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); - } - - @Test - public void testConsumerBrokerRestart() throws Exception { - - Callable consumeMessageTask = new Callable() { - @Override - public Object call() throws Exception { - - int receivedMessageCount = 0; - - ActiveMQConnectionFactory consumerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2006)?randomize=false&backup=false"); - Connection consumerConnection = consumerFactory.createConnection(); - - try { - - consumerConnection.setClientID("consumer"); - consumerConnection.start(); - - Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - MessageConsumer messageConsumer = consumerSession.createConsumer(QUEUE_NAME); - - while (true) { - TextMessage textMsg = (TextMessage) messageConsumer.receive(5000); - - if (textMsg == null) { - return receivedMessageCount; - } - - receivedMessageCount++; - LOG.info("*** receivedMessageCount {} message has MessageID {} ", receivedMessageCount, textMsg.getJMSMessageID()); - - // on first delivery ensure the message is pending an - // ack when it is resent from the producer broker - if (textMsg.getJMSMessageID().endsWith("1") && receivedMessageCount == 1) { - LOG.info("Waiting for restart..."); - consumerRestartedAndMessageForwarded.await(90, TimeUnit.SECONDS); - } - - textMsg.acknowledge(); - } - } - finally { - consumerConnection.close(); - } - } - }; - - Runnable consumerBrokerResetTask = new Runnable() { - @Override - public void run() { - - try { - // wait for signal - stopConsumerBroker.await(); - - LOG.info("********* STOPPING CONSUMER BROKER"); - - consumerBroker.stop(); - consumerBroker.waitUntilStopped(); - - LOG.info("***** STARTING CONSUMER BROKER"); - // do not delete messages on startup - consumerBroker = createConsumerBroker(false); - - LOG.info("***** CONSUMER BROKER STARTED!!"); - consumerBrokerRestarted.countDown(); - - assertTrue("message forwarded on time", Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - LOG.info("ProducerBroker totalMessageCount: " + producerBroker.getAdminView().getTotalMessageCount()); - return producerBroker.getAdminView().getTotalMessageCount() == 0; - } - })); - consumerRestartedAndMessageForwarded.countDown(); - - } - catch (Exception e) { - LOG.error("Exception when stopping/starting the consumerBroker ", e); - } - - } - }; - - ExecutorService executor = Executors.newFixedThreadPool(2); - - // start consumerBroker start/stop task - executor.execute(consumerBrokerResetTask); - - // start consuming messages - Future<Integer> numberOfConsumedMessage = executor.submit(consumeMessageTask); - - produceMessages(); - - // Wait for consumer to finish - int totalMessagesConsumed = numberOfConsumedMessage.get(); - - StringBuffer contents = new StringBuffer(); - boolean messageInStore = isMessageInJDBCStore(localDataSource, contents); - LOG.debug("****number of messages received " + totalMessagesConsumed); - - assertEquals("number of messages received", 2, totalMessagesConsumed); - assertEquals("messages left in store", true, messageInStore); - assertTrue("message is in dlq: " + contents.toString(), contents.toString().contains("DLQ")); - } - - private void produceMessages() throws JMSException { - - ActiveMQConnectionFactory producerFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:2003)?randomize=false&backup=false"); - Connection producerConnection = producerFactory.createConnection(); - - try { - producerConnection.setClientID("producer"); - producerConnection.start(); - - Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - final MessageProducer remoteProducer = producerSession.createProducer(QUEUE_NAME); - - int i = 0; - while (MESSAGE_COUNT > i) { - String payload = "test msg " + i; - TextMessage msg = producerSession.createTextMessage(payload); - remoteProducer.send(msg); - i++; - } - - } - finally { - producerConnection.close(); - } - } - - @Before - public void setUp() throws Exception { - LOG.debug("Running with enableCursorAudit set to {}", this.enableCursorAudit); - doSetUp(); - } - - @After - public void tearDown() throws Exception { - doTearDown(); - } - - protected void doTearDown() throws Exception { - - try { - producerBroker.stop(); - } - catch (Exception ex) { - } - try { - consumerBroker.stop(); - } - catch (Exception ex) { - } - } - - protected void doSetUp() throws Exception { - producerBroker = createProducerBroker(); - consumerBroker = createConsumerBroker(true); - } - - /** - * Producer broker listens on localhost:2003 networks to consumerBroker - - * localhost:2006 - * - * @return - * @throws Exception - */ - protected BrokerService createProducerBroker() throws Exception { - - String networkToPorts[] = new String[]{"2006"}; - HashMap<String, String> networkProps = new HashMap<>(); - - networkProps.put("networkTTL", "10"); - networkProps.put("conduitSubscriptions", "true"); - networkProps.put("decreaseNetworkConsumerPriority", "true"); - networkProps.put("dynamicOnly", "true"); - - BrokerService broker = new BrokerService(); - broker.getManagementContext().setCreateConnector(false); - broker.setDeleteAllMessagesOnStartup(true); - broker.setBrokerName("BP"); - broker.setAdvisorySupport(false); - - // lazy init listener on broker start - TransportConnector transportConnector = new TransportConnector(); - transportConnector.setUri(new URI("tcp://localhost:2003")); - List<TransportConnector> transportConnectors = new ArrayList<>(); - transportConnectors.add(transportConnector); - broker.setTransportConnectors(transportConnectors); - - // network to consumerBroker - - if (networkToPorts.length > 0) { - StringBuilder builder = new StringBuilder("static:(failover:(tcp://localhost:2006)?maxReconnectAttempts=0)?useExponentialBackOff=false"); - NetworkConnector nc = broker.addNetworkConnector(builder.toString()); - IntrospectionSupport.setProperties(nc, networkProps); - nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQQueue[]{QUEUE_NAME})); - } - - // Persistence adapter - - JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter(); - EmbeddedDataSource remoteDataSource = new EmbeddedDataSource(); - remoteDataSource.setDatabaseName("target/derbyDBRemoteBroker"); - remoteDataSource.setCreateDatabase("create"); - jdbc.setDataSource(remoteDataSource); - broker.setPersistenceAdapter(jdbc); - - // set Policy entries - PolicyEntry policy = new PolicyEntry(); - - policy.setQueue(">"); - policy.setEnableAudit(false); - policy.setUseCache(false); - policy.setExpireMessagesPeriod(0); - - // set replay with no consumers - ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); - conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); - policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); - - PolicyMap pMap = new PolicyMap(); - pMap.setDefaultEntry(policy); - broker.setDestinationPolicy(pMap); - - broker.start(); - broker.waitUntilStarted(); - - return broker; - } - - /** - * consumerBroker - listens on localhost:2006 - * - * @param deleteMessages - drop messages when broker instance is created - * @return - * @throws Exception - */ - protected BrokerService createConsumerBroker(boolean deleteMessages) throws Exception { - - String scheme = "tcp"; - String listenPort = "2006"; - - BrokerService broker = new BrokerService(); - broker.getManagementContext().setCreateConnector(false); - broker.setDeleteAllMessagesOnStartup(deleteMessages); - broker.setBrokerName("BC"); - // lazy init listener on broker start - TransportConnector transportConnector = new TransportConnector(); - transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort)); - List<TransportConnector> transportConnectors = new ArrayList<>(); - transportConnectors.add(transportConnector); - broker.setTransportConnectors(transportConnectors); - - // policy entries - - PolicyEntry policy = new PolicyEntry(); - - policy.setQueue(">"); - policy.setEnableAudit(enableCursorAudit); - policy.setExpireMessagesPeriod(0); - - // set replay with no consumers - ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); - conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true); - policy.setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory); - - PolicyMap pMap = new PolicyMap(); - - pMap.setDefaultEntry(policy); - broker.setDestinationPolicy(pMap); - - // Persistence adapter - JDBCPersistenceAdapter localJDBCPersistentAdapter = new JDBCPersistenceAdapter(); - EmbeddedDataSource localDataSource = new EmbeddedDataSource(); - localDataSource.setDatabaseName("target/derbyDBLocalBroker"); - localDataSource.setCreateDatabase("create"); - localJDBCPersistentAdapter.setDataSource(localDataSource); - broker.setPersistenceAdapter(localJDBCPersistentAdapter); - - if (deleteMessages) { - // no plugin on restart - broker.setPlugins(new BrokerPlugin[]{new MyTestPlugin()}); - } - - this.localDataSource = localDataSource; - - broker.start(); - broker.waitUntilStarted(); - - return broker; - } - - /** - * Query JDBC Store to see if messages are left - * - * @param dataSource - * @return - * @throws SQLException - */ - private boolean isMessageInJDBCStore(DataSource dataSource, StringBuffer stringBuffer) throws SQLException { - - boolean tableHasData = false; - String query = "select * from ACTIVEMQ_MSGS"; - - java.sql.Connection conn = dataSource.getConnection(); - PreparedStatement s = conn.prepareStatement(query); - - ResultSet set = null; - - try { - StringBuffer headers = new StringBuffer(); - set = s.executeQuery(); - ResultSetMetaData metaData = set.getMetaData(); - for (int i = 1; i <= metaData.getColumnCount(); i++) { - - if (i == 1) { - headers.append("||"); - } - headers.append(metaData.getColumnName(i) + "||"); - } - LOG.error(headers.toString()); - - while (set.next()) { - tableHasData = true; - - for (int i = 1; i <= metaData.getColumnCount(); i++) { - if (i == 1) { - stringBuffer.append("|"); - } - stringBuffer.append(set.getString(i) + "|"); - } - LOG.error(stringBuffer.toString()); - } - } - finally { - try { - set.close(); - } - catch (Throwable ignore) { - } - try { - s.close(); - } - catch (Throwable ignore) { - } - - conn.close(); - } - - return tableHasData; - } - - /** - * plugin used to ensure consumerbroker is restared before the network - * message from producerBroker is acked - */ - class MyTestPlugin implements BrokerPlugin { - - @Override - public Broker installPlugin(Broker broker) throws Exception { - return new MyTestBroker(broker); - } - } - - class MyTestBroker extends BrokerFilter { - - public MyTestBroker(Broker next) { - super(next); - } - - @Override - public void send(ProducerBrokerExchange producerExchange, - org.apache.activemq.command.Message messageSend) throws Exception { - - super.send(producerExchange, messageSend); - LOG.error("Stopping broker on send: " + messageSend.getMessageId().getProducerSequenceId()); - stopConsumerBroker.countDown(); - producerExchange.getConnectionContext().setDontSendReponse(true); - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java deleted file mode 100644 index beab4c3..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import static org.junit.Assert.assertNotNull; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.Session; -import javax.jms.Topic; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.jmx.BrokerViewMBean; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ5035Test { - - private static final String CLIENT_ID = "amq-test-client-id"; - private static final String DURABLE_SUB_NAME = "testDurable"; - - private final String xbean = "xbean:"; - private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq5035"; - - private static BrokerService brokerService; - private String connectionUri; - - @Before - public void setUp() throws Exception { - brokerService = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml"); - connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString(); - brokerService.setDeleteAllMessagesOnStartup(true); - brokerService.start(); - brokerService.waitUntilStarted(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - brokerService.waitUntilStopped(); - } - - @Test - public void testFoo() throws Exception { - ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); - Connection connection = factory.createConnection(); - connection.setClientID(CLIENT_ID); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("Test.Topic"); - MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUB_NAME); - consumer.close(); - - BrokerViewMBean brokerView = getBrokerView(DURABLE_SUB_NAME); - brokerView.destroyDurableSubscriber(CLIENT_ID, DURABLE_SUB_NAME); - } - - private BrokerViewMBean getBrokerView(String testDurable) throws MalformedObjectNameException { - ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost"); - BrokerViewMBean view = (BrokerViewMBean) brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true); - assertNotNull(view); - return view; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java deleted file mode 100644 index 8596683..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5136Test.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.bugs; - -import javax.jms.BytesMessage; -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerRegistry; -import org.apache.activemq.broker.BrokerService; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -public class AMQ5136Test { - - BrokerService brokerService; - - @Before - public void startBroker() throws Exception { - brokerService = new BrokerService(); - brokerService.setPersistent(false); - brokerService.start(); - } - - @After - public void stopBroker() throws Exception { - brokerService.stop(); - } - - @Test - public void memoryUsageOnCommit() throws Exception { - sendMessagesAndAssertMemoryUsage(new TransactionHandler() { - @Override - public void finishTransaction(Session session) throws JMSException { - session.commit(); - } - }); - } - - @Test - public void memoryUsageOnRollback() throws Exception { - sendMessagesAndAssertMemoryUsage(new TransactionHandler() { - @Override - public void finishTransaction(Session session) throws JMSException { - session.rollback(); - } - }); - } - - private void sendMessagesAndAssertMemoryUsage(TransactionHandler transactionHandler) throws Exception { - ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost"); - Connection connection = connectionFactory.createConnection(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Topic destination = session.createTopic("ActiveMQBug"); - MessageProducer producer = session.createProducer(destination); - for (int i = 0; i < 100; i++) { - BytesMessage message = session.createBytesMessage(); - message.writeBytes(generateBytes()); - producer.send(message); - transactionHandler.finishTransaction(session); - } - connection.close(); - org.junit.Assert.assertEquals(0, BrokerRegistry.getInstance().findFirst().getSystemUsage().getMemoryUsage().getPercentUsage()); - } - - private byte[] generateBytes() { - byte[] bytes = new byte[100000]; - for (int i = 0; i < 100000; i++) { - bytes[i] = (byte) i; - } - return bytes; - } - - private static interface TransactionHandler { - - void finishTransaction(Session session) throws JMSException; - } -} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/449bffd0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java deleted file mode 100644 index dc37c79..0000000 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5212Test.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.bugs; - -import java.util.Arrays; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Session; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQMessageProducer; -import org.apache.activemq.ActiveMQSession; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTextMessage; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; -import org.apache.activemq.util.Wait; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -@RunWith(value = Parameterized.class) -public class AMQ5212Test { - - BrokerService brokerService; - - @Parameterized.Parameter(0) - public boolean concurrentStoreAndDispatchQ = true; - - @Parameterized.Parameters(name = "concurrentStoreAndDispatch={0}") - public static Iterable<Object[]> getTestParameters() { - return Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}); - } - - @Before - public void setUp() throws Exception { - start(true); - } - - public void start(boolean deleteAllMessages) throws Exception { - brokerService = new BrokerService(); - if (deleteAllMessages) { - brokerService.deleteAllMessages(); - } - ((KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQ); - brokerService.addConnector("tcp://localhost:0"); - brokerService.setAdvisorySupport(false); - brokerService.start(); - } - - @After - public void tearDown() throws Exception { - brokerService.stop(); - } - - @Test - public void verifyDuplicateSuppressionWithConsumer() throws Exception { - doVerifyDuplicateSuppression(100, 100, true); - } - - @Test - public void verifyDuplicateSuppression() throws Exception { - doVerifyDuplicateSuppression(100, 100, false); - } - - public void doVerifyDuplicateSuppression(final int numToSend, - final int expectedTotalEnqueue, - final boolean demand) throws Exception { - final ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); - connectionFactory.setCopyMessageOnSend(false); - connectionFactory.setWatchTopicAdvisories(false); - - final int concurrency = 40; - final AtomicInteger workCount = new AtomicInteger(numToSend); - ExecutorService executorService = Executors.newFixedThreadPool(concurrency); - for (int i = 0; i < concurrency; i++) { - executorService.execute(new Runnable() { - @Override - public void run() { - try { - int i; - while ((i = workCount.getAndDecrement()) > 0) { - ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); - activeMQConnection.start(); - ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - ActiveMQQueue dest = new ActiveMQQueue("queue-" + i + "-" + AMQ5212Test.class.getSimpleName()); - ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); - if (demand) { - // create demand so page in will happen - activeMQSession.createConsumer(dest); - } - ActiveMQTextMessage message = new ActiveMQTextMessage(); - message.setDestination(dest); - activeMQMessageProducer.send(message, null); - - // send a duplicate - activeMQConnection.syncSendPacket(message); - activeMQConnection.close(); - - } - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - TimeUnit.SECONDS.sleep(1); - executorService.shutdown(); - executorService.awaitTermination(5, TimeUnit.MINUTES); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return expectedTotalEnqueue == brokerService.getAdminView().getTotalEnqueueCount(); - } - }); - assertEquals("total enqueue as expected", expectedTotalEnqueue, brokerService.getAdminView().getTotalEnqueueCount()); - } - - @Test - public void verifyConsumptionOnDuplicate() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); - connectionFactory.setCopyMessageOnSend(false); - connectionFactory.setWatchTopicAdvisories(false); - - ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); - activeMQConnection.start(); - ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - ActiveMQQueue dest = new ActiveMQQueue("Q"); - ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); - ActiveMQTextMessage message = new ActiveMQTextMessage(); - message.setDestination(dest); - activeMQMessageProducer.send(message, null); - - // send a duplicate - activeMQConnection.syncSendPacket(message); - - activeMQConnection.close(); - - // verify original can be consumed after restart - brokerService.stop(); - brokerService.start(false); - - connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); - connectionFactory.setCopyMessageOnSend(false); - connectionFactory.setWatchTopicAdvisories(false); - - activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); - activeMQConnection.start(); - activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - - MessageConsumer messageConsumer = activeMQSession.createConsumer(dest); - Message received = messageConsumer.receive(4000); - assertNotNull("Got message", received); - assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); - - activeMQConnection.close(); - } - - @Test - public void verifyClientAckConsumptionOnDuplicate() throws Exception { - - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); - connectionFactory.setCopyMessageOnSend(false); - connectionFactory.setWatchTopicAdvisories(false); - - ActiveMQConnection activeMQConnection = (ActiveMQConnection) connectionFactory.createConnection(); - activeMQConnection.start(); - ActiveMQSession activeMQSession = (ActiveMQSession) activeMQConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - ActiveMQQueue dest = new ActiveMQQueue("Q"); - - MessageConsumer messageConsumer = activeMQSession.createConsumer(dest); - - ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) activeMQSession.createProducer(dest); - ActiveMQTextMessage message = new ActiveMQTextMessage(); - message.setDestination(dest); - activeMQMessageProducer.send(message, null); - - // send a duplicate - activeMQConnection.syncSendPacket(message); - - Message received = messageConsumer.receive(4000); - assertNotNull("Got message", received); - assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); - messageConsumer.close(); - - messageConsumer = activeMQSession.createConsumer(dest); - received = messageConsumer.receive(4000); - assertNotNull("Got message", received); - assertEquals("match", message.getJMSMessageID(), received.getJMSMessageID()); - received.acknowledge(); - - activeMQConnection.close(); - } -}