AMQ-7067 - tidy up tests and add prepare variant, limit rollback location 
recording to xa case. There is still some work to do for the ack compaction 
case to make it aware of the tx records such that those are transferred as 
necessary

(cherry picked from commit 57c7939534a927bfc2d1b0454aac7ef8d804532b)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7fa85185
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7fa85185
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7fa85185

Branch: refs/heads/activemq-5.15.x
Commit: 7fa85185aae02414ff023727efc46680f5ead66a
Parents: cbe486f
Author: gtully <gary.tu...@gmail.com>
Authored: Tue Oct 9 12:01:47 2018 +0100
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Oct 10 10:23:21 2018 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |   2 +-
 .../org/apache/activemq/bugs/AMQ7067Test.java   | 128 ++++++++++++++-----
 2 files changed, 99 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7fa85185/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 82c4865..86dfcac 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1437,7 +1437,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
                 updates = preparedTransactions.remove(key);
             }
         }
-        if (updates != null) {
+        if (key.isXATransaction() && updates != null) {
             for(Operation op : updates) {
                 recordAckMessageReferenceLocation(location, op.getLocation());
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7fa85185/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
index c1f34d0..d00ee41 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7067Test.java
@@ -14,12 +14,14 @@ import 
org.apache.activemq.broker.jmx.RecoveredXATransactionViewMBean;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.XATransactionId;
 import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.lang.StringUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import javax.jms.*;
+import javax.management.InstanceNotFoundException;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import javax.transaction.xa.XAException;
@@ -32,11 +34,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.UnsupportedEncodingException;
-import java.net.URI;
+import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import static javax.transaction.xa.XAResource.*;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class AMQ7067Test {
 
@@ -82,11 +87,55 @@ public class AMQ7067Test {
         broker.start();
     }
 
+    @Test
+    public void testXAPrepare() throws Exception {
+
+        setupXAConnection();
+
+        Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
+
+        MessageProducer holdKahaDbProducer = 
xaSession.createProducer(holdKahaDb);
+
+        XATransactionId txid = createXATransaction();
+        System.out.println("****** create new txid = " + txid);
+        xaRes.start(txid, TMNOFLAGS);
+
+        TextMessage helloMessage = 
xaSession.createTextMessage(StringUtils.repeat("a", 10));
+        holdKahaDbProducer.send(helloMessage);
+        xaRes.end(txid, TMSUCCESS);
+
+        Queue queue = xaSession.createQueue("test");
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+
+        xaRes.prepare(txid);
+
+        produce(xaRes, xaSession, queue, 100, 512 * 1024);
+
+        ((org.apache.activemq.broker.region.Queue) 
broker.getRegionBroker().getDestinationMap().get(queue)).purge();
+
+        Xid[] xids = xaRes.recover(TMSTARTRSCAN);
+
+        //Should be 1 since we have only 1 prepared
+        assertEquals(1, xids.length);
+        connection.close();
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+
+        setupXAConnection();
+        xids = xaRes.recover(TMSTARTRSCAN);
+
+        System.out.println("****** recovered = " + xids);
+
+        // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
+        assertEquals(1, xids.length);
+    }
 
     @Test
-    public void testAMQ7067XAcommit() throws Exception {
+    public void testXAcommit() throws Exception {
 
-        PersistenceAdapterViewMBean kahadbView = 
getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
         setupXAConnection();
 
         Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
@@ -124,15 +173,14 @@ public class AMQ7067Test {
         setupXAConnection();
         xids = xaRes.recover(TMSTARTRSCAN);
 
-        // THIS SHOULD NOT FAIL AS THERE SHOUL DBE ONLY 1 TRANSACTION!
+        // THIS SHOULD NOT FAIL AS THERE SHOULD DBE ONLY 1 TRANSACTION!
         assertEquals(1, xids.length);
 
     }
 
     @Test
-    public void testAMQ7067XArollback() throws Exception {
+    public void testXArollback() throws Exception {
 
-        PersistenceAdapterViewMBean kahadbView = 
getProxyToPersistenceAdapter(broker.getPersistenceAdapter().toString());
         setupXAConnection();
 
         Queue holdKahaDb = xaSession.createQueue("holdKahaDb");
@@ -176,11 +224,11 @@ public class AMQ7067Test {
     }
 
     @Test
-    public void testAMQ7067commit() throws Exception {
+    public void testCommit() throws Exception {
         final Connection connection = 
ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
         connection.start();
 
-        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
         Queue holdKahaDb = session.createQueue("holdKahaDb");
         MessageProducer holdKahaDbProducer = 
session.createProducer(holdKahaDb);
         TextMessage helloMessage = 
session.createTextMessage(StringUtils.repeat("a", 10));
@@ -192,14 +240,28 @@ public class AMQ7067Test {
 
         System.out.println(String.format("QueueSize %s: %d", 
holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
         purgeQueue(queue.getQueueName());
-        Thread.sleep(10000);
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
+            }
+        });
+
+        // force gc
+        broker.getPersistenceAdapter().checkpoint(true);
 
+
+        connection.close();
         curruptIndexFile(getDataDirectory());
 
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+        broker.waitUntilStarted();
 
         while(true) {
             try {
-                Thread.sleep(10000);
+                TimeUnit.SECONDS.sleep(1);
                 System.out.println(String.format("QueueSize %s: %d", 
holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
                 break;
             } catch (Exception ex) {
@@ -208,18 +270,16 @@ public class AMQ7067Test {
             }
         }
 
-        connection.close();
-
         // THIS SHOULD NOT FAIL AS THERE SHOULD BE ONLY 1 TRANSACTION!
         assertEquals(1, getQueueSize(holdKahaDb.getQueueName()));
     }
 
     @Test
-    public void testAMQ7067rollback() throws Exception {
+    public void testRollback() throws Exception {
         final Connection connection = 
ACTIVE_MQ_NON_XA_CONNECTION_FACTORY.createConnection();
         connection.start();
 
-        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
         Queue holdKahaDb = session.createQueue("holdKahaDb");
         MessageProducer holdKahaDbProducer = 
session.createProducer(holdKahaDb);
         TextMessage helloMessage = 
session.createTextMessage(StringUtils.repeat("a", 10));
@@ -231,26 +291,34 @@ public class AMQ7067Test {
 
         System.out.println(String.format("QueueSize %s: %d", 
holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
         purgeQueue(queue.getQueueName());
-        Thread.sleep(10000);
-
-        curruptIndexFile(getDataDirectory());
 
-
-        while(true) {
-            try {
-                Thread.sleep(10000);
-                System.out.println(String.format("QueueSize %s: %d", 
holdKahaDb.getQueueName(), getQueueSize(holdKahaDb.getQueueName())));
-                break;
-            } catch (Exception ex) {
-                System.out.println(ex.getMessage());
-                break;
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == getQueueSize(queue.getQueueName());
             }
-        }
+        });
+
+        // force gc
+        broker.getPersistenceAdapter().checkpoint(true);
 
         connection.close();
+        curruptIndexFile(getDataDirectory());
+
+        broker.stop();
+        broker.waitUntilStopped();
+        createBroker();
+        broker.waitUntilStarted();
 
-        // THIS SHOULD NOT FAIL AS THERE SHOULD ZERO TRANSACTION!
-        assertEquals(0, getQueueSize(holdKahaDb.getQueueName()));
+
+        // no sign of the test queue on recovery, rollback is the default for 
any inflight
+        // this test serves as a sanity check on existing behaviour
+        try {
+            getQueueSize(holdKahaDb.getQueueName());
+            fail("expect InstanceNotFoundException");
+        } catch (UndeclaredThrowableException expected) {
+            assertTrue(expected.getCause() instanceof 
InstanceNotFoundException);
+        }
     }
 
     protected static void createDanglingTransaction(XAResource xaRes, 
XASession xaSession, Queue queue) throws JMSException, IOException, XAException 
{
@@ -281,7 +349,7 @@ public class AMQ7067Test {
     }
 
     protected static void produce(Connection connection, Queue queue, int 
messageCount, int messageSize) throws JMSException, IOException, XAException {
-        Session session = connection.createSession(true, 
Session.CLIENT_ACKNOWLEDGE);
+        Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
         MessageProducer producer = session.createProducer(queue);
 
         for (int i = 0; i < messageCount; i++) {

Reply via email to