This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 824e0b6  NO-JIRA Adding tests over JMSBridge statistics
     new 9a2de3e  This closes #2584
824e0b6 is described below

commit 824e0b6e7e5e10b121e0361634aee71694340e1e
Author: Emmanuel Hugonnet <emmanuel.hugonnet+gitkra...@gmail.com>
AuthorDate: Tue Mar 12 18:01:20 2019 +0100

    NO-JIRA Adding tests over JMSBridge statistics
---
 pom.xml                                            |   1 +
 .../jms/bridge/FailingTransactionManager.java      | 172 +++++++++++++++++++++
 .../tests/extras/jms/bridge/JMSBridgeTest.java     |  52 ++++++-
 3 files changed, 223 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2d7972c..7d7ba99 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1600,6 +1600,7 @@
                   <exclude>**/.factorypath</exclude>
                   <exclude>**/org.apache.activemq.artemis.cfg</exclude>
                   <exclude>**/nb-configuration.xml</exclude>
+                  <exclude>**/nbactions-tests.xml</exclude>
                   <!-- activemq5 unit tests exclude -->
                   <exclude>**/*.data</exclude>
                   <exclude>**/*.bin</exclude>
diff --git 
a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/FailingTransactionManager.java
 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/FailingTransactionManager.java
new file mode 100644
index 0000000..e124b06
--- /dev/null
+++ 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/FailingTransactionManager.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.extras.jms.bridge;
+
+import com.arjuna.ats.arjuna.common.Uid;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.transaction.HeuristicMixedException;
+import javax.transaction.HeuristicRollbackException;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.NotSupportedException;
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAResource;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+
+public class FailingTransactionManager implements TransactionManager {
+
+   private static final IntegrationTestLogger log = 
IntegrationTestLogger.LOGGER;
+
+   private final TransactionManager tm;
+   private int calls;
+   private final int limit;
+   private final AtomicInteger failures = new AtomicInteger(0);
+   private final Map<Uid, FailingTransaction> transactions = 
Collections.synchronizedMap(new HashMap<>(10));
+
+   public FailingTransactionManager(TransactionManager tm, int limit) {
+      this.tm = tm;
+      this.calls = 0;
+      this.limit = limit;
+   }
+
+   @Override
+   public void begin() throws NotSupportedException, SystemException {
+      tm.begin();
+   }
+
+   @Override
+   public void commit() throws RollbackException, HeuristicMixedException, 
HeuristicRollbackException, SecurityException, IllegalStateException, 
SystemException {
+      transactions.remove(((com.arjuna.ats.jta.transaction.Transaction) 
tm.getTransaction()).get_uid()).commit();
+   }
+
+   @Override
+   public void rollback() throws IllegalStateException, SecurityException, 
SystemException {
+      transactions.remove(((com.arjuna.ats.jta.transaction.Transaction) 
tm.getTransaction()).get_uid()).rollback();
+   }
+
+   @Override
+   public void setRollbackOnly() throws IllegalStateException, SystemException 
{
+      tm.setRollbackOnly();
+   }
+
+   @Override
+   public int getStatus() throws SystemException {
+      return tm.getStatus();
+   }
+
+   @Override
+   public Transaction getTransaction() throws SystemException {
+      com.arjuna.ats.jta.transaction.Transaction real =  
(com.arjuna.ats.jta.transaction.Transaction) tm.getTransaction();
+      if (transactions.containsKey(real.get_uid())) {
+         return transactions.get(real.get_uid());
+      }
+      FailingTransaction tx = new FailingTransaction(real, calls++);
+      transactions.put(real.get_uid(), tx);
+      return tx;
+   }
+
+   @Override
+   public void setTransactionTimeout(int i) throws SystemException {
+      tm.setTransactionTimeout(i);
+   }
+
+   @Override
+   public Transaction suspend() throws SystemException {
+      Transaction real = tm.suspend();
+      if (real == null) {
+         return null;
+      }
+      return transactions.get(((com.arjuna.ats.jta.transaction.Transaction) 
real).get_uid());
+   }
+
+   public int getFailures() {
+      return failures.get();
+   }
+
+   @Override
+   public void resume(Transaction transaction) throws 
InvalidTransactionException, IllegalStateException, SystemException {
+      tm.resume(((FailingTransaction)transaction).transaction);
+   }
+
+   private final class FailingTransaction implements Transaction {
+
+      private final com.arjuna.ats.jta.transaction.Transaction transaction;
+      private final int number;
+
+      private FailingTransaction(com.arjuna.ats.jta.transaction.Transaction 
transaction, int number) {
+         this.transaction = transaction;
+         this.number = number;
+      }
+
+      @Override
+      public void commit() throws RollbackException, HeuristicMixedException, 
HeuristicRollbackException, SecurityException, IllegalStateException, 
SystemException {
+         if (number < limit) {
+            transaction.commit();
+            transactions.remove(transaction.get_uid());
+         } else {
+            int fails = failures.incrementAndGet();
+            RollbackException ex = new RollbackException("Expected rollback 
for test");
+            log.tracef(ex, "We are about to fail commit for %s th  time", 
fails);
+            throw ex;
+         }
+      }
+
+
+      @Override
+      public boolean delistResource(XAResource arg0, int arg1) throws 
IllegalStateException, SystemException {
+         return transaction.delistResource(arg0, arg1);
+      }
+
+      @Override
+      public boolean enlistResource(XAResource arg0) throws RollbackException, 
IllegalStateException, SystemException {
+         return transaction.enlistResource(arg0);
+      }
+
+      @Override
+      public int getStatus() throws SystemException {
+         return transaction.getStatus();
+      }
+
+      @Override
+      public void registerSynchronization(Synchronization arg0) throws 
RollbackException, IllegalStateException, SystemException {
+         transaction.registerSynchronization(arg0);
+      }
+
+      @Override
+      public void rollback() throws IllegalStateException, SystemException {
+         transaction.rollback();
+         transactions.remove(transaction.get_uid());
+      }
+
+      @Override
+      public void setRollbackOnly() throws IllegalStateException, 
SystemException {
+         transaction.setRollbackOnly();
+      }
+
+      @Override
+      public String toString() {
+         return "FailingTransaction{" + "transaction=" + transaction.get_uid() 
+ ", number=" + number + '}';
+      }
+   }
+
+}
diff --git 
a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
index 41288a9..af1f0fb 100644
--- 
a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
+++ 
b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java
@@ -42,6 +42,7 @@ import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
 import org.apache.activemq.artemis.api.jms.JMSFactoryType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import static 
org.apache.activemq.artemis.core.settings.impl.AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS;
 import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
 import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
 import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
@@ -791,6 +792,8 @@ public class JMSBridgeTest extends BridgeTestBase {
 
       TransactionManager mgr = newTransactionManager();
 
+      final int NUM_MESSAGES = 10;
+
       try {
 
          toResume = mgr.suspend();
@@ -799,14 +802,14 @@ public class JMSBridgeTest extends BridgeTestBase {
 
          started = mgr.getTransaction();
 
-         final int NUM_MESSAGES = 10;
-
          bridge = new JMSBridgeImpl(cff0, cff1, sourceTopicFactory, 
targetQueueFactory, null, null, null, null, null, 5000, 10, 
QualityOfServiceMode.AT_MOST_ONCE, 1, -1, null, null, 
false).setBridgeName("test-bridge");
          bridge.start();
 
          sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false, largeMessage);
 
          checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, 
largeMessage);
+         Assert.assertEquals(0L, bridge.getAbortedMessageCount());
+         Assert.assertEquals("We didn't get the correct number processed 
messages", NUM_MESSAGES, bridge.getMessageCount());
       } finally {
          if (started != null) {
             try {
@@ -830,6 +833,47 @@ public class JMSBridgeTest extends BridgeTestBase {
    }
 
    @Test
+   public void testAbortedMessages() throws Exception {
+      JMSBridgeImpl bridge = null;
+
+      final int NUM_MESSAGES = 20;
+      final int MAX_BATCH_SIZE = 1;
+      final int RETRY = 2;
+      final int LIMIT = 2;
+      final int FAILURES = (NUM_MESSAGES - LIMIT) * 
DEFAULT_MAX_DELIVERY_ATTEMPTS;
+      FailingTransactionManager transactionManager = new 
FailingTransactionManager(newTransactionManager(), LIMIT);
+      try {
+         bridge = new JMSBridgeImpl(cff0, cff1, sourceQueueFactory, 
targetQueueFactory, null, null, null, null, null, 5000, RETRY, 
QualityOfServiceMode.ONCE_AND_ONLY_ONCE, MAX_BATCH_SIZE, -1, null, null, 
false).setBridgeName("test-bridge");
+         bridge.setTransactionManager(transactionManager);
+         bridge.start();
+         sendMessages(cf0, sourceQueue, 0, NUM_MESSAGES, false, false);
+         try (Connection conn = cf1.createConnection()) {
+            conn.start();
+            Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer cons = sess.createConsumer(targetQueue);
+            // Consume the messages
+            for (int i = 0; i <= LIMIT; i++) {
+               Message tm = cons.receive(3000);
+               if (tm != null) {
+                  Assert.assertNotNull("Message " + i + " is null", tm);
+                  Assert.assertEquals("message" + i, ((TextMessage) 
tm).getText());
+               }
+            }
+         }
+         Assert.assertEquals("We didn't get the correct number failures", 
FAILURES, transactionManager.getFailures());
+         Assert.assertEquals("We didn't get the correct number of aborted 
messages", FAILURES, bridge.getAbortedMessageCount());
+         Assert.assertEquals("We didn't get the correct number of processed 
messages", FAILURES + LIMIT, bridge.getMessageCount());
+      } finally {
+         if (bridge != null) {
+            bridge.stop();
+         }
+      }
+      Assert.assertEquals("We didn't get the correct number failures", 
FAILURES, transactionManager.getFailures());
+      Assert.assertEquals("We didn't get the correct number of aborted 
messages", FAILURES, bridge.getAbortedMessageCount());
+      Assert.assertEquals("We didn't get the correct number of processed 
messages", FAILURES + LIMIT, bridge.getMessageCount());
+   }
+
+   @Test
    public void testNonDurableSubscriberLargeMessage() throws Exception {
       internalTestNonDurableSubscriber(true, 1);
    }
@@ -852,6 +896,8 @@ public class JMSBridgeTest extends BridgeTestBase {
          sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, false, largeMessage);
 
          checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, 
largeMessage);
+         Assert.assertEquals(0L, bridge.getAbortedMessageCount());
+         Assert.assertEquals("We didn't get the correct number processed 
messages", NUM_MESSAGES, bridge.getMessageCount());
       } finally {
          if (bridge != null) {
             bridge.stop();
@@ -882,6 +928,8 @@ public class JMSBridgeTest extends BridgeTestBase {
          sendMessages(cf0, sourceTopic, 0, NUM_MESSAGES, true, largeMessage);
 
          checkAllMessageReceivedInOrder(cf1, targetQueue, 0, NUM_MESSAGES, 
largeMessage);
+         Assert.assertEquals(0L, bridge.getAbortedMessageCount());
+         Assert.assertEquals("We didn't get the correct number processed 
messages", NUM_MESSAGES, bridge.getMessageCount());
       } finally {
          if (bridge != null) {
             bridge.stop();

Reply via email to