This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 824e0b6 NO-JIRA Adding tests over JMSBridge statistics new 9a2de3e This closes #2584 824e0b6 is described below commit 824e0b6e7e5e10b121e0361634aee71694340e1e Author: Emmanuel Hugonnet <emmanuel.hugonnet+gitkra...@gmail.com> AuthorDate: Tue Mar 12 18:01:20 2019 +0100 NO-JIRA Adding tests over JMSBridge statistics --- pom.xml | 1 + .../jms/bridge/FailingTransactionManager.java | 172 +++++++++++++++++++++ .../tests/extras/jms/bridge/JMSBridgeTest.java | 52 ++++++- 3 files changed, 223 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 2d7972c..7d7ba99 100644 --- a/pom.xml +++ b/pom.xml @@ -1600,6 +1600,7 @@ <exclude>**/.factorypath</exclude> <exclude>**/org.apache.activemq.artemis.cfg</exclude> <exclude>**/nb-configuration.xml</exclude> + <exclude>**/nbactions-tests.xml</exclude> <!-- activemq5 unit tests exclude --> <exclude>**/*.data</exclude> <exclude>**/*.bin</exclude> diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/FailingTransactionManager.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/FailingTransactionManager.java new file mode 100644 index 0000000..e124b06 --- /dev/null +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/FailingTransactionManager.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.extras.jms.bridge; + +import com.arjuna.ats.arjuna.common.Uid; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import javax.transaction.HeuristicMixedException; +import javax.transaction.HeuristicRollbackException; +import javax.transaction.InvalidTransactionException; +import javax.transaction.NotSupportedException; +import javax.transaction.RollbackException; +import javax.transaction.Synchronization; +import javax.transaction.SystemException; +import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; +import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger; + +public class FailingTransactionManager implements TransactionManager { + + private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER; + + private final TransactionManager tm; + private int calls; + private final int limit; + private final AtomicInteger failures = new AtomicInteger(0); + private final Map<Uid, FailingTransaction> transactions = Collections.synchronizedMap(new HashMap<>(10)); + + public FailingTransactionManager(TransactionManager tm, int limit) { + this.tm = tm; + this.calls = 0; + this.limit = limit; + } + + @Override + public void begin() throws NotSupportedException, SystemException { + tm.begin(); + } + + @Override + public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException { + transactions.remove(((com.arjuna.ats.jta.transaction.Transaction) tm.getTransaction()).get_uid()).commit(); + } + + @Override + public void rollback() throws IllegalStateException, SecurityException, SystemException { + transactions.remove(((com.arjuna.ats.jta.transaction.Transaction) tm.getTransaction()).get_uid()).rollback(); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + tm.setRollbackOnly(); + } + + @Override + public int getStatus() throws SystemException { + return tm.getStatus(); + } + + @Override + public Transaction getTransaction() throws SystemException { + com.arjuna.ats.jta.transaction.Transaction real = (com.arjuna.ats.jta.transaction.Transaction) tm.getTransaction(); + if (transactions.containsKey(real.get_uid())) { + return transactions.get(real.get_uid()); + } + FailingTransaction tx = new FailingTransaction(real, calls++); + transactions.put(real.get_uid(), tx); + return tx; + } + + @Override + public void setTransactionTimeout(int i) throws SystemException { + tm.setTransactionTimeout(i); + } + + @Override + public Transaction suspend() throws SystemException { + Transaction real = tm.suspend(); + if (real == null) { + return null; + } + return transactions.get(((com.arjuna.ats.jta.transaction.Transaction) real).get_uid()); + } + + public int getFailures() { + return failures.get(); + } + + @Override + public void resume(Transaction transaction) throws InvalidTransactionException, IllegalStateException, SystemException { + tm.resume(((FailingTransaction)transaction).transaction); + } + + private final class FailingTransaction implements Transaction { + + private final com.arjuna.ats.jta.transaction.Transaction transaction; + private final int number; + + private FailingTransaction(com.arjuna.ats.jta.transaction.Transaction transaction, int number) { + this.transaction = transaction; + this.number = number; + } + + @Override + public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException { + if (number < limit) { + transaction.commit(); + transactions.remove(transaction.get_uid()); + } else { + int fails = failures.incrementAndGet(); + RollbackException ex = new RollbackException("Expected rollback for test"); + log.tracef(ex, "We are about to fail commit for %s th time", fails); + throw ex; + } + } + + + @Override + public boolean delistResource(XAResource arg0, int arg1) throws IllegalStateException, SystemException { + return transaction.delistResource(arg0, arg1); + } + + @Override + public boolean enlistResource(XAResource arg0) throws RollbackException, IllegalStateException, SystemException { + return transaction.enlistResource(arg0); + } + + @Override + public int getStatus() throws SystemException { + return transaction.getStatus(); + } + + @Override + public void registerSynchronization(Synchronization arg0) throws RollbackException, IllegalStateException, SystemException { + transaction.registerSynchronization(arg0); + } + + @Override + public void rollback() throws IllegalStateException, SystemException { + transaction.rollback(); + transactions.remove(transaction.get_uid()); + } + + @Override + public void setRollbackOnly() throws IllegalStateException, SystemException { + transaction.setRollbackOnly(); + } + + @Override + public String toString() { + return "FailingTransaction{" + "transaction=" + transaction.get_uid() + ", number=" + number + '}'; + } + } + +} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java index 41288a9..af1f0fb 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java @@ -42,6 +42,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.api.jms.JMSFactoryType; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import static org.apache.activemq.artemis.core.settings.impl.AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS; import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl; @@ -791,6 +792,8 @@ public class JMSBridgeTest extends BridgeTestBase { TransactionManager mgr = newTransactionManager(); + final int NUM_MESSAGES = 10; + try { toResume = mgr.suspend(); @@ -799,14 +802,14 @@ public class JMSBridgeTest extends BridgeTestBase { started = mgr.getTransaction(); - final int NUM_MESSAGES = 10; - bridge = new JMSBridgeImpl(cff0, cff1, sourceTopicFactory, targetQueueFactory, null, null, null, null, null, 5000, 10, QualityOfServiceMode.AT_MOST_ONCE, 1, -1, null, null, false).setBridgeName("test-bridge"); bridge.start(); sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false, largeMessage); checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage); + Assert.assertEquals(0L, bridge.getAbortedMessageCount()); + Assert.assertEquals("We didn't get the correct number processed messages", NUM_MESSAGES, bridge.getMessageCount()); } finally { if (started != null) { try { @@ -830,6 +833,47 @@ public class JMSBridgeTest extends BridgeTestBase { } @Test + public void testAbortedMessages() throws Exception { + JMSBridgeImpl bridge = null; + + final int NUM_MESSAGES = 20; + final int MAX_BATCH_SIZE = 1; + final int RETRY = 2; + final int LIMIT = 2; + final int FAILURES = (NUM_MESSAGES - LIMIT) * DEFAULT_MAX_DELIVERY_ATTEMPTS; + FailingTransactionManager transactionManager = new FailingTransactionManager(newTransactionManager(), LIMIT); + try { + bridge = new JMSBridgeImpl(cff0, cff1, sourceQueueFactory, targetQueueFactory, null, null, null, null, null, 5000, RETRY, QualityOfServiceMode.ONCE_AND_ONLY_ONCE, MAX_BATCH_SIZE, -1, null, null, false).setBridgeName("test-bridge"); + bridge.setTransactionManager(transactionManager); + bridge.start(); + sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES, false, false); + try (Connection conn = cf1.createConnection()) { + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer cons = sess.createConsumer(targetQueue); + // Consume the messages + for (int i = 0; i <= LIMIT; i++) { + Message tm = cons.receive(3000); + if (tm != null) { + Assert.assertNotNull("Message " + i + " is null", tm); + Assert.assertEquals("message" + i, ((TextMessage) tm).getText()); + } + } + } + Assert.assertEquals("We didn't get the correct number failures", FAILURES, transactionManager.getFailures()); + Assert.assertEquals("We didn't get the correct number of aborted messages", FAILURES, bridge.getAbortedMessageCount()); + Assert.assertEquals("We didn't get the correct number of processed messages", FAILURES + LIMIT, bridge.getMessageCount()); + } finally { + if (bridge != null) { + bridge.stop(); + } + } + Assert.assertEquals("We didn't get the correct number failures", FAILURES, transactionManager.getFailures()); + Assert.assertEquals("We didn't get the correct number of aborted messages", FAILURES, bridge.getAbortedMessageCount()); + Assert.assertEquals("We didn't get the correct number of processed messages", FAILURES + LIMIT, bridge.getMessageCount()); + } + + @Test public void testNonDurableSubscriberLargeMessage() throws Exception { internalTestNonDurableSubscriber(true, 1); } @@ -852,6 +896,8 @@ public class JMSBridgeTest extends BridgeTestBase { sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false, largeMessage); checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage); + Assert.assertEquals(0L, bridge.getAbortedMessageCount()); + Assert.assertEquals("We didn't get the correct number processed messages", NUM_MESSAGES, bridge.getMessageCount()); } finally { if (bridge != null) { bridge.stop(); @@ -882,6 +928,8 @@ public class JMSBridgeTest extends BridgeTestBase { sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, true, largeMessage); checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, largeMessage); + Assert.assertEquals(0L, bridge.getAbortedMessageCount()); + Assert.assertEquals("We didn't get the correct number processed messages", NUM_MESSAGES, bridge.getMessageCount()); } finally { if (bridge != null) { bridge.stop();