This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 91fd12ad1f ARTEMIS-4480: rationalise openwire session tx usage and 
operation context usage, use completion callbacks to ensure exclusive consumers 
are isolated
91fd12ad1f is described below

commit 91fd12ad1f1723e4fa536d4edd9548cac2275be2
Author: Gary Tully <[email protected]>
AuthorDate: Tue Oct 31 15:53:09 2023 +0000

    ARTEMIS-4480: rationalise openwire session tx usage and operation context 
usage, use completion callbacks to ensure exclusive consumers are isolated
---
 .../core/protocol/openwire/OpenWireConnection.java | 121 +++--
 .../core/protocol/openwire/amq/AMQSession.java     |   8 +
 .../artemis/core/persistence/OperationContext.java |   2 +-
 .../impl/journal/OperationContextImpl.java         |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        |  27 +-
 .../core/server/impl/ServerSessionImpl.java        |  29 +-
 .../core/transaction/impl/TransactionImpl.java     |   3 +-
 .../impl/journal/OperationContextUnitTest.java     |  41 ++
 .../PrefetchRedeliveryCountOpenwireTest.java       | 487 ++++++++++++++++++++-
 .../openwire/amq/RedeliveryPolicyTest.java         |  52 ++-
 10 files changed, 691 insertions(+), 81 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 77a095ae63..ee895a51a1 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -682,18 +682,7 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    private void rollbackInProgressLocalTransactions() {
 
       for (Transaction tx : txMap.values()) {
-         AMQSession session = (AMQSession) tx.getProtocolData();
-         if (session != null) {
-            session.getCoreSession().resetTX(tx);
-            try {
-               session.getCoreSession().rollback(false);
-            } catch (Exception expectedOnExistingOutcome) {
-            } finally {
-               session.getCoreSession().resetTX(null);
-            }
-         } else {
-            tx.tryRollback();
-         }
+         tx.tryRollback();
       }
    }
 
@@ -729,7 +718,7 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
          for (SessionId sessionId : sessionIdMap.values()) {
             AMQSession session = sessions.get(sessionId);
             if (session != null) {
-               session.close();
+               session.close(fail);
             }
          }
          internalSession.close(false);
@@ -782,8 +771,6 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
       recoverOperationContext();
 
-      rollbackInProgressLocalTransactions();
-
       if (me != null) {
          //filter it like the other protocols
          if (!(me instanceof ActiveMQRemoteDisconnectException)) {
@@ -796,6 +783,20 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
          }
       } catch (InvalidClientIDException e) {
          logger.warn("Couldn't close connection because invalid clientID", e);
+      } finally {
+         // there may be some transactions not associated with sessions
+         // deal with them after sessions are removed via connection removal
+         operationContext.executeOnCompletion(new IOCallback() {
+            @Override
+            public void done() {
+               rollbackInProgressLocalTransactions();
+            }
+
+            @Override
+            public void onError(int errorCode, String errorMessage) {
+               rollbackInProgressLocalTransactions();
+            }
+         });
       }
       shutdown(true);
    }
@@ -1338,7 +1339,11 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
 
       @Override
       public Response processRollbackTransaction(TransactionInfo info) throws 
Exception {
-         Transaction tx = lookupTX(info.getTransactionId(), null, true);
+         Transaction tx = lookupTX(info.getTransactionId(), null);
+
+         if (tx == null) {
+            throw new IllegalStateException("Transaction not started, " + 
info.getTransactionId());
+         }
 
          final AMQSession amqSession;
          if (tx != null) {
@@ -1354,11 +1359,12 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
             if (amqSession != null) {
                amqSession.getCoreSession().resetTX(tx);
 
-               try {
-                  returnReferences(tx, amqSession);
-               } finally {
-                  amqSession.getCoreSession().resetTX(null);
-               }
+               tx.addOperation(new TransactionOperationAbstract() {
+                  @Override
+                  public void beforeRollback(Transaction tx) throws Exception {
+                     returnReferences(tx, amqSession);
+                  }
+               });
             }
          }
 
@@ -1406,6 +1412,9 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
             }
          } else {
             if (tx != null) {
+               // returnReferences() is only interested in acked messages 
already in the tx, hence we bypass
+               // getCoreSession().rollback() logic for CORE which would add 
any prefetched/delivered which have
+               // not yet been processed by the openwire client.
                tx.rollback();
             }
          }
@@ -1490,18 +1499,46 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       @Override
       public Response processBeginTransaction(TransactionInfo info) throws 
Exception {
          final TransactionId txID = info.getTransactionId();
-
          try {
             internalSession.resetTX(null);
             if (txID.isXATransaction()) {
-               Xid xid = OpenWireUtil.toXID(txID);
+               final Xid xid = OpenWireUtil.toXID(txID);
                internalSession.xaStart(xid);
+               final ResourceManager resourceManager = 
server.getResourceManager();
+               final Transaction transaction = 
resourceManager.getTransaction(xid);
+               transaction.addOperation(new TransactionOperationAbstract() {
+                  @Override
+                  public void afterCommit(Transaction tx) {
+                     removeFromResourceManager();
+                  }
+
+                  @Override
+                  public void afterRollback(Transaction tx) {
+                     removeFromResourceManager();
+                  }
+
+                  private void removeFromResourceManager() {
+                     try {
+                        resourceManager.removeTransaction(xid, 
getRemotingConnection());
+                     } catch (ActiveMQException bestEffort) {
+                     }
+                  }
+               });
             } else {
-               Transaction transaction = internalSession.newTransaction();
+               final Transaction transaction = 
internalSession.newTransaction();
                txMap.put(txID, transaction);
                transaction.addOperation(new TransactionOperationAbstract() {
                   @Override
                   public void afterCommit(Transaction tx) {
+                     removeFromTxMap();
+                  }
+
+                  @Override
+                  public void afterRollback(Transaction tx) {
+                     removeFromTxMap();
+                  }
+
+                  private void removeFromTxMap() {
                      txMap.remove(txID);
                   }
                });
@@ -1520,7 +1557,11 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       private Response processCommit(TransactionInfo info, boolean onePhase) 
throws Exception {
          TransactionId txID = info.getTransactionId();
 
-         Transaction tx = lookupTX(txID, null, true);
+         Transaction tx = lookupTX(txID, null);
+
+         if (tx == null) {
+            throw new IllegalStateException("Transaction not started, " + 
txID);
+         }
 
          if (txID.isXATransaction()) {
             ResourceManager resourceManager = server.getResourceManager();
@@ -1557,7 +1598,13 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
             }
          } else {
             if (tx != null) {
-               tx.commit(onePhase);
+               AMQSession amqSession = (AMQSession) tx.getProtocolData();
+               if (amqSession != null) {
+                  amqSession.getCoreSession().resetTX(tx);
+                  amqSession.getCoreSession().commit();
+               } else {
+                  tx.commit(true);
+               }
             }
          }
 
@@ -1630,8 +1677,6 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
                logger.warn("Error during method invocation", e);
                throw e;
             }
-         } else {
-            txMap.remove(txID);
          }
 
          return null;
@@ -1705,8 +1750,6 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
                sendException(e);
             }
             throw e;
-         } finally {
-            session.getCoreSession().resetTX(null);
          }
 
          return null;
@@ -1716,6 +1759,11 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       public Response processMessageAck(MessageAck ack) throws Exception {
          AMQSession session = getSession(ack.getConsumerId().getParentId());
          Transaction tx = lookupTX(ack.getTransactionId(), session);
+
+         if (ack.getTransactionId() != null && tx == null) {
+            throw new IllegalStateException("Transaction not started, " + 
ack.getTransactionId());
+         }
+
          session.getCoreSession().resetTX(tx);
 
          try {
@@ -1725,8 +1773,6 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
             if (tx != null) {
                tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
             }
-         } finally {
-            session.getCoreSession().resetTX(null);
          }
          return null;
       }
@@ -1819,21 +1865,16 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    }
 
    private Transaction lookupTX(TransactionId txID, AMQSession session) throws 
Exception {
-      return lookupTX(txID, session, false);
-   }
-
-   private Transaction lookupTX(TransactionId txID, AMQSession session, 
boolean remove) throws Exception {
       if (txID == null) {
          return null;
       }
 
-      Xid xid = null;
       Transaction transaction;
       if (txID.isXATransaction()) {
-         xid = OpenWireUtil.toXID(txID);
-         transaction = remove ? 
server.getResourceManager().removeTransaction(xid, this) : 
server.getResourceManager().getTransaction(xid);
+         final Xid xid = OpenWireUtil.toXID(txID);
+         transaction = server.getResourceManager().getTransaction(xid);
       } else {
-         transaction = remove ? txMap.remove(txID) : txMap.get(txID);
+         transaction = txMap.get(txID);
       }
 
       if (transaction == null) {
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 9acfd5dd7a..2e582a8bbb 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -547,6 +547,14 @@ public class AMQSession implements SessionCallback {
       this.coreSession.close(false);
    }
 
+   @Override
+   public void close(boolean failed) {
+      try {
+         this.coreSession.close(failed);
+      } catch (Exception bestEffort) {
+      }
+   }
+
    public OpenWireConnection getConnection() {
       return connection;
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
index 57ba7eca7a..dd7276fdfc 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
@@ -62,7 +62,7 @@ public interface OperationContext extends IOCompletion {
     */
    boolean waitCompletion(long timeout) throws Exception;
 
-   default void clear() {
+   default void reset() {
 
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index ceab9c0c1f..a56f90a012 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -449,7 +449,7 @@ public class OperationContextImpl implements 
OperationContext {
    }
 
    @Override
-   public synchronized void clear() {
+   public synchronized void reset() {
       stored = 0;
       storeLineUpField = 0;
       minimalReplicated = 0;
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7f7cae750e..7b735e0daa 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1534,7 +1534,23 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
             }
 
             if (consumer == exclusiveConsumer) {
-               exclusiveConsumer = null;
+
+               // await context completion such that any delivered are returned
+               // to the queue. Preserve an ordered view for the next exclusive
+               // consumer
+               storageManager.afterCompleteOperations(new IOCallback() {
+
+                  @Override
+                  public void onError(final int errorCode, final String 
errorMessage) {
+                     releaseExclusiveConsumer();
+                  }
+
+                  @Override
+                  public void done() {
+                     releaseExclusiveConsumer();
+                  }
+
+               });
             }
 
             groups.removeIf(consumer::equals);
@@ -1543,6 +1559,14 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
       }
    }
 
+   private void releaseExclusiveConsumer() {
+      synchronized (QueueImpl.this) {
+         exclusiveConsumer = null;
+         resetAllIterators();
+      }
+      deliverAsync();
+   }
+
    private void stopDispatch() {
       boolean stopped = dispatchingUpdater.compareAndSet(this, 
BooleanUtil.toInt(true), BooleanUtil.toInt(false));
       if (stopped) {
@@ -3136,7 +3160,6 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
                if (holder == null) {
                   // this shouldn't happen, however I'm adding this check just 
in case
                   logger.debug("consumers.next() returned null.");
-                  consumers.remove();
                   deliverAsync(true);
                   return false;
                }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 73050766f7..41c25d25db 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -142,7 +142,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    protected final ServerProducers serverProducers;
 
-   protected Transaction tx;
+   protected volatile Transaction tx;
 
    /** This will store the Transaction between xaEnd and xaPrepare or xaCommit.
     *  in a failure scenario (client is gone), this will be held between xaEnd 
and xaCommit. */
@@ -395,15 +395,10 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
          callback.close(failed);
       }
       synchronized (this) {
-         if (!closed) {
-            if (server.hasBrokerSessionPlugins()) {
-               server.callBrokerSessionPlugins(plugin -> 
plugin.beforeCloseSession(this, failed));
-            }
+         if (server.hasBrokerSessionPlugins()) {
+            server.callBrokerSessionPlugins(plugin -> 
plugin.beforeCloseSession(this, failed));
          }
          this.setStarted(false);
-         if (closed)
-            return;
-
          if (failed) {
 
             Transaction txToRollback = tx;
@@ -432,7 +427,6 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
                }
             }
          }
-         closed = true;
       }
 
       //putting closing of consumers outside the sync block
@@ -653,7 +647,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
     * Notice that we set autoCommitACK and autoCommitSends to true if tx == 
null
     */
    @Override
-   public void resetTX(Transaction transaction) {
+   public synchronized void resetTX(Transaction transaction) {
       this.tx = transaction;
       this.autoCommitAcks = transaction == null;
       this.autoCommitSends = transaction == null;
@@ -1724,20 +1718,29 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
 
    @Override
    public void close(final boolean failed, final boolean force) {
-      if (closed)
-         return;
+      synchronized (this) {
+         if (closed) {
+            return;
+         }
+         closed = true;
+      }
 
       if (force) {
-         context.clear();
+         context.reset();
       }
 
       context.executeOnCompletion(new IOCallback() {
          @Override
          public void onError(int errorCode, String errorMessage) {
+            callDoClose();
          }
 
          @Override
          public void done() {
+            callDoClose();
+         }
+
+         private void callDoClose() {
             try {
                doClose(failed);
             } catch (Exception e) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 5256155f41..f619c289f5 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -382,8 +382,7 @@ public class TransactionImpl implements Transaction {
          // We will like to execute afterRollback and clear anything pending
          ActiveMQServerLogger.LOGGER.failedToPerformRollback(e);
       }
-      // We want to make sure that nothing else gets done after the commit is 
issued
-      // this will eliminate any possibility or races
+      // We want to make sure that nothing else gets done after the rollback 
is issued
       final List<TransactionOperation> operationsToComplete = this.operations;
       this.operations = null;
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
index 6e95d6a3b3..5b81a8153a 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.persistence.impl.journal;
 
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -27,6 +28,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -247,6 +249,45 @@ public class OperationContextUnitTest extends 
ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testSequentialCompletionN() throws Exception {
+      ExecutorService executor = 
Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
+      ConcurrentLinkedQueue<Long> completions = new ConcurrentLinkedQueue();
+      final int N = 500;
+      try {
+         final OperationContextImpl impl = new OperationContextImpl(new 
OrderedExecutor(executor));
+
+         // pending work to queue completions till done
+         impl.storeLineUp();
+
+         for (long l = 0; l < N; l++) {
+            long finalL = l;
+            impl.executeOnCompletion(new IOCallback() {
+               @Override
+               public void onError(int errorCode, String errorMessage) {
+               }
+
+               @Override
+               public void done() {
+                  completions.add(finalL);
+               }
+            });
+         }
+
+         impl.done();
+
+         Wait.assertEquals(N, ()-> completions.size());
+
+         for (long i = 0; i < N; i++) {
+            assertEquals("ordered", i, (long) completions.poll());
+         }
+
+      } finally {
+         executor.shutdownNow();
+      }
+   }
+
+
    @Test
    public void testErrorNotLostOnPageSyncError() throws Exception {
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
index 1dc6105541..c1a563cdd4 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.openwire;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -26,6 +27,8 @@ import javax.jms.TextMessage;
 
 import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -36,16 +39,25 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.cli.commands.tools.PrintData;
+import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.failover.FailoverTransport;
+import org.apache.activemq.transport.tcp.TcpTransport;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -67,7 +79,14 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
       // force send to dlq early
       addressSettingsMap.put("exampleQueue", new 
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
 
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
       // force send to dlq late
-      addressSettingsMap.put("exampleQueueTwo", new 
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
 
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(4000));
+      addressSettingsMap.put("exampleQueueTwo", new 
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
 
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(-1));
+   }
+
+   @Override
+   protected void extraServerConfig(Configuration serverConfig) {
+      // useful to debug if there is some contention that causes a concurrent 
rollback,
+      // when true, the rollback journal update uses the operation context and 
the failure propagates
+      //serverConfig.setJournalSyncTransactional(true);
    }
 
    @Test(timeout = 60_000)
@@ -155,7 +174,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
 
          TextMessage message = session.createTextMessage("This is a text 
message");
 
-         int numMessages = 2000;
+         int numMessages = 10000;
          for (int i = 0; i < numMessages; i++) {
             message.setIntProperty("SEQ", i);
             producer.send(message);
@@ -163,7 +182,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
          session.commit();
          exConn.close();
 
-         final int batch = 100;
+         final int batch = 200;
          for (int i = 0; i < numMessages; i += batch) {
             // connection per batch
             exConn = exFact.createConnection();
@@ -172,7 +191,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
             session = exConn.createSession(true, Session.SESSION_TRANSACTED);
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
-            TextMessage messageReceived = null;
+            TextMessage messageReceived;
             for (int j = 0; j < batch; j++) { // a small batch
                messageReceived = (TextMessage) messageConsumer.receive(5000);
                Assert.assertNotNull("null @ i=" + i, messageReceived);
@@ -183,7 +202,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
             session.commit();
 
             // force a local socket close such that the broker sees an 
exception on the connection and fails the consumer via close
-            
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
+            
((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class).stop();
             exConn.close();
          }
       } finally {
@@ -194,9 +213,429 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
    }
 
    @Test(timeout = 60_000)
+   public void testServerSideRollbackOnCloseOrder() throws Exception {
+
+      final ArrayList<Throwable> errors = new ArrayList<>();
+      SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+
+      Queue queue = new ActiveMQQueue(durableQueue.toString());
+
+      final ActiveMQConnectionFactory exFact = new 
ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=10&maxReconnectAttempts=0&timeout=1000");
+      exFact.setWatchTopicAdvisories(false);
+      exFact.setConnectResponseTimeout(10000);
+
+      ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new 
ActiveMQPrefetchPolicy();
+      prefetchPastMaxDeliveriesInLoop.setAll(2000);
+      exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
+
+      RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+      redeliveryPolicy.setRedeliveryDelay(0);
+      redeliveryPolicy.setMaximumRedeliveries(-1);
+      exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+      Connection exConn = exFact.createConnection();
+      exConn.start();
+
+      Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(queue);
+      TextMessage message = session.createTextMessage("This is a text 
message");
+
+      int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++) {
+         message.setIntProperty("SEQ", i);
+         producer.send(message);
+      }
+      session.commit();
+      exConn.close();
+
+      final int numConsumers = 2;
+      ExecutorService executorService = Executors.newCachedThreadPool();
+      // consume under load
+      final int numLoadProducers = 2;
+      underLoad(numLoadProducers, ()-> {
+
+         // a bunch of concurrent batch consumers, expecting order
+         AtomicBoolean done = new AtomicBoolean(false);
+         AtomicInteger receivedCount = new AtomicInteger();
+         AtomicInteger inProgressBatch = new AtomicInteger();
+
+         final int batch = 100;
+
+         final  ExecutorService commitExecutor = 
Executors.newCachedThreadPool();
+
+         Runnable consumerTask = () -> {
+
+            Connection toCloseOnError = null;
+            while (!done.get() && receivedCount.get() < 20 * numMessages) {
+               try (Connection consumerConnection = exFact.createConnection()) 
{
+
+                  toCloseOnError = consumerConnection;
+                  ((ActiveMQConnection) 
consumerConnection).setCloseTimeout(1); // so rollback on close won't block 
after socket close exception
+
+                  consumerConnection.start();
+
+                  Session consumerConnectionSession = 
consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+                  MessageConsumer messageConsumer = 
consumerConnectionSession.createConsumer(queue);
+                  TextMessage messageReceived = null;
+
+                  int i = 0;
+                  for (; i < batch; i++) {
+                     messageReceived = (TextMessage) 
messageConsumer.receive(2000);
+                     if (messageReceived == null) {
+                        break;
+                     }
+
+                     receivedCount.incrementAndGet();
+                     // need to infer batch from seq number and adjust - 
client never gets commit response
+                     int receivedSeq = messageReceived.getIntProperty("SEQ");
+                     int currentBatch = (receivedSeq / batch);
+                     if (i == 0) {
+                        if (inProgressBatch.get() != currentBatch) {
+                           if (inProgressBatch.get() + 1 == currentBatch) {
+                              inProgressBatch.incrementAndGet(); // all good, 
next batch
+                              logger.info("@:" + receivedCount.get() + ", 
current batch increment to: " + inProgressBatch.get() + ", Received Seq: " + 
receivedSeq + ", Message: " + messageReceived);
+                           } else {
+                              // we have an order problem
+                              done.set(true);
+                              throw new AssertionError("@:" + 
receivedCount.get() + ", batch out of sequence, expected: " + 
(inProgressBatch.get() + 1) + ", but have: " + currentBatch + " @" + 
receivedSeq + ", Message: " + messageReceived);
+                           }
+                        }
+                     }
+                     // verify within batch order
+                     Assert.assertEquals("@:" + receivedCount.get() + " batch 
out of order", ((long) batch * inProgressBatch.get()) + i, receivedSeq);
+                  }
+
+                  if (i != batch) {
+                     continue;
+                  }
+
+                  // manual ack in tx to setup server for rollback work on fail
+                  Transport transport = ((ActiveMQConnection) 
consumerConnection).getTransport();
+                  TransactionId txId =  new 
LocalTransactionId(((ActiveMQConnection) 
consumerConnection).getConnectionInfo().getConnectionId(), receivedCount.get());
+                  TransactionInfo tx = new 
TransactionInfo(((ActiveMQConnection) 
consumerConnection).getConnectionInfo().getConnectionId(), txId, 
TransactionInfo.BEGIN);
+                  transport.request(tx);
+                  MessageAck ack = new MessageAck();
+                  ActiveMQMessage mqMessage = (ActiveMQMessage) 
messageReceived;
+                  ack.setDestination(mqMessage.getDestination());
+                  ack.setMessageID(mqMessage.getMessageId());
+                  ack.setMessageCount(batch);
+                  ack.setTransactionId(tx.getTransactionId());
+                  
ack.setConsumerId(((ActiveMQMessageConsumer)messageConsumer).getConsumerId());
+
+                  transport.request(ack);
+
+                  try {
+                     // force a local socket close such that the broker sees 
an exception on the connection and fails the consumer via serverConsumer close
+                     ((ActiveMQConnection) 
consumerConnection).getTransport().narrow(TcpTransport.class).stop();
+                  } catch (Throwable expected) {
+                  }
+
+               } catch (ConcurrentModificationException | NullPointerException 
ignored) {
+               } catch (JMSException ignored) {
+                  // expected on executor stop
+               } catch (Throwable unexpected) {
+                  unexpected.printStackTrace();
+                  errors.add(unexpected);
+                  done.set(true);
+               } finally {
+                  if (toCloseOnError != null) {
+                     try {
+                        toCloseOnError.close();
+                     } catch (Throwable ignored) {
+                     }
+                  }
+               }
+            }
+         };
+
+         for (int i = 0; i < numConsumers; i++) {
+            executorService.submit(consumerTask);
+         }
+         executorService.shutdown();
+
+
+         try {
+            assertTrue(executorService.awaitTermination(30, TimeUnit.SECONDS));
+            assertTrue(errors.isEmpty());
+         } catch (Throwable t) {
+            errors.add(t);
+         } finally {
+            done.set(true);
+            commitExecutor.shutdownNow();
+            executorService.shutdownNow();
+         }
+
+         Assert.assertTrue("errors: " + errors, errors.isEmpty());
+      });
+
+      Assert.assertTrue(errors.isEmpty());
+   }
+
+   @Test(timeout = 60_000)
+   public void testExclusiveConsumerBatchOrderUnderLoad() throws Exception {
+
+      final ArrayList<Throwable> errors = new ArrayList<>();
+      SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+
+      Queue queue = new ActiveMQQueue(durableQueue.toString());
+
+      final ActiveMQConnectionFactory exFact = new 
ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=10&maxReconnectAttempts=0&timeout=1000");
+      exFact.setWatchTopicAdvisories(false);
+      exFact.setConnectResponseTimeout(10000);
+
+      ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new 
ActiveMQPrefetchPolicy();
+      prefetchPastMaxDeliveriesInLoop.setAll(2000);
+      exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
+
+      RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+      redeliveryPolicy.setRedeliveryDelay(0);
+      redeliveryPolicy.setMaximumRedeliveries(-1);
+      exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+      Connection exConn = exFact.createConnection();
+      exConn.start();
+
+      Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+      MessageProducer producer = session.createProducer(queue);
+      TextMessage message = session.createTextMessage("This is a text 
message");
+
+      int numMessages = 10000;
+
+      for (int i = 0; i < numMessages; i++) {
+         message.setIntProperty("SEQ", i);
+         producer.send(message);
+      }
+      session.commit();
+      exConn.close();
+
+      final int numConsumers = 4;
+      ExecutorService executorService = Executors.newCachedThreadPool();
+      // consume under load
+      final int numLoadProducers = 4;
+      underLoad(numLoadProducers, ()-> {
+
+         // a bunch of concurrent batch consumers, expecting order
+         AtomicBoolean done = new AtomicBoolean(false);
+         AtomicInteger receivedCount = new AtomicInteger();
+         AtomicInteger inProgressBatch = new AtomicInteger();
+
+         final int batch = 200;
+
+         final  ExecutorService commitExecutor = 
Executors.newCachedThreadPool();
+
+         Runnable consumerTask = () -> {
+
+            Connection toCloseOnError = null;
+            while (!done.get() && 
server.locateQueue(durableQueue).getMessageCount() > 0L) {
+               try (Connection consumerConnection = exFact.createConnection()) 
{
+
+                  toCloseOnError = consumerConnection;
+                  ((ActiveMQConnection) 
consumerConnection).setCloseTimeout(1); // so rollback on close won't block 
after socket close exception
+
+                  consumerConnection.start();
+
+                  Session consumerConnectionSession = 
consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+                  MessageConsumer messageConsumer = 
consumerConnectionSession.createConsumer(queue);
+                  TextMessage messageReceived;
+
+                  int i = 0;
+                  for (; i < batch; i++) {
+                     messageReceived = (TextMessage) 
messageConsumer.receive(2000);
+                     if (messageReceived == null) {
+                        break;
+                     }
+
+                     receivedCount.incrementAndGet();
+                     // need to infer batch from seq number and adjust - 
client never gets commit response
+                     int receivedSeq = messageReceived.getIntProperty("SEQ");
+                     int currentBatch = (receivedSeq / batch);
+                     if (i == 0) {
+                        if (inProgressBatch.get() != currentBatch) {
+                           if (inProgressBatch.get() + 1 == currentBatch) {
+                              inProgressBatch.incrementAndGet(); // all good, 
next batch
+                              logger.info("@:" + receivedCount.get() + ", 
current batch increment to: " + inProgressBatch.get() + ", Received Seq: " + 
receivedSeq + ", Message: " + messageReceived);
+                           } else {
+                              // we have an order problem
+                              done.set(true);
+                              throw new AssertionError("@:" + 
receivedCount.get() + ", batch out of sequence, expected: " + 
(inProgressBatch.get() + 1) + ", but have: " + currentBatch + " @" + 
receivedSeq + ", Message: " + messageReceived);
+                           }
+                        }
+                     }
+                     // verify within batch order
+                     Assert.assertEquals("@:" + receivedCount.get() + " batch 
out of order", ((long) batch * inProgressBatch.get()) + i, receivedSeq);
+                  }
+
+                  if (i != batch) {
+                     continue;
+                  }
+
+                  // arrange concurrent commit - ack/commit of batch
+                  // with server side error, potential for ack/commit and 
close-on-fail to contend
+                  final CountDownLatch latch = new CountDownLatch(1);
+                  final Session finalSession = consumerConnectionSession;
+                  commitExecutor.submit(() -> {
+                     try {
+                        latch.countDown();
+                        finalSession.commit();
+
+                     } catch (Throwable expected) {
+                     }
+                  });
+
+                  latch.await(1, TimeUnit.SECONDS);
+
+                  // give a chance to have a batch complete to make progress!
+                  TimeUnit.MILLISECONDS.sleep(15);
+
+                  try {
+                     // force a local socket close such that the broker sees 
an exception on the connection and fails the consumer via serverConsumer close
+                     ((ActiveMQConnection) 
consumerConnection).getTransport().narrow(TcpTransport.class).stop();
+                  } catch (Throwable expected) {
+                  }
+
+               } catch (InterruptedException | ConcurrentModificationException 
| NullPointerException ignored) {
+               } catch (JMSException ignored) {
+                  // expected on executor stop
+               } catch (Throwable unexpected) {
+                  unexpected.printStackTrace();
+                  errors.add(unexpected);
+                  done.set(true);
+               } finally {
+                  if (toCloseOnError != null) {
+                     try {
+                        toCloseOnError.close();
+                     } catch (Throwable ignored) {
+                     }
+                  }
+               }
+            }
+         };
+
+         for (int i = 0; i < numConsumers; i++) {
+            executorService.submit(consumerTask);
+         }
+         executorService.shutdown();
+
+         try {
+            Wait.assertEquals(0L, () -> {
+               if (!errors.isEmpty()) {
+                  return -1;
+               }
+               return server.locateQueue(durableQueue).getMessageCount();
+            }, 30 * 1000);
+         } catch (Throwable t) {
+            errors.add(t);
+         } finally {
+            done.set(true);
+            commitExecutor.shutdownNow();
+            executorService.shutdownNow();
+         }
+
+         Assert.assertTrue(errors.isEmpty());
+      });
+
+      Assert.assertTrue(errors.isEmpty());
+   }
+
+   public void underLoad(final int numProducers, Runnable r) throws Exception {
+      // produce some load with a producer(s)/consumer
+      SimpleString durableQueue = new SimpleString("exampleQueue");
+      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
+
+      ExecutorService executor = Executors.newFixedThreadPool(numProducers + 
1);
+
+      Queue queue;
+      ConnectionFactory cf;
+      boolean useCoreForLoad = true;
+
+      if (useCoreForLoad) {
+         org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory 
connectionFactory = new 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
+         connectionFactory.setConfirmationWindowSize(1000000);
+         connectionFactory.setBlockOnDurableSend(true);
+         connectionFactory.setBlockOnNonDurableSend(true);
+         cf = connectionFactory;
+
+         queue = 
connectionFactory.createContext().createQueue(durableQueue.toString());
+
+      } else {
+         ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?startupMaxReconnectAttempts=0&maxReconnectAttempts=0&timeout=1000");
+         connectionFactory.setWatchTopicAdvisories(false);
+         connectionFactory.setCloseTimeout(1);
+         connectionFactory.setSendTimeout(2000);
+
+         cf = connectionFactory;
+         queue = new ActiveMQQueue(durableQueue.toString());
+      }
+
+
+      final Queue destination = queue;
+      final ConnectionFactory connectionFactory = cf;
+      final AtomicBoolean done = new AtomicBoolean();
+      Runnable producerTask = ()-> {
+
+         try (Connection exConn = connectionFactory.createConnection()) {
+
+            exConn.start();
+
+            final Session session = exConn.createSession(true, 
Session.SESSION_TRANSACTED);
+            final MessageProducer producer = 
session.createProducer(destination);
+            final TextMessage message = session.createTextMessage("This is a 
text message");
+
+            int count = 1;
+            while (!done.get()) {
+               producer.send(message);
+               if ((count++ % 100) == 0) {
+                  session.commit();
+               }
+            }
+         } catch (Exception ignored) {
+         }
+      };
+
+      for (int i = 0; i < numProducers; i++) {
+         executor.submit(producerTask);
+      }
+      // one consumer
+      executor.submit(()-> {
+
+         try (Connection exConn = connectionFactory.createConnection()) {
+            exConn.start();
+
+            Session session = exConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = 
session.createConsumer(destination);
+
+            while (!done.get()) {
+               messageConsumer.receive(200);
+            }
+         } catch (Exception ignored) {
+         }
+      });
+
+
+      try {
+         r.run();
+      } finally {
+         done.set(true);
+         executor.shutdown();
+         if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+            executor.shutdownNow();
+         }
+         logger.info("LOAD ADDED: " + 
server.locateQueue(durableQueue).getMessagesAdded());
+      }
+   }
+
+   @Test(timeout = 120_000)
    public void 
testExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws 
Exception {
+      doTestExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch();
+   }
+
+   public void 
doTestExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws 
Exception {
       Connection exConn = null;
 
+      ExecutorService executorService = Executors.newFixedThreadPool(10);
       SimpleString durableQueue = new SimpleString("exampleQueueTwo");
       this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
       AtomicInteger batchConsumed = new AtomicInteger(0);
@@ -221,8 +660,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
 
          TextMessage message = session.createTextMessage("This is a text 
message");
 
-         ExecutorService executorService = Executors.newSingleThreadExecutor();
-         int numMessages = 600;
+         int numMessages = 1000;
          for (int i = 0; i < numMessages; i++) {
             message.setIntProperty("SEQ", i);
             producer.send(message);
@@ -230,7 +668,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
          session.close();
          exConn.close();
 
-         final int batch = numMessages;
+         final int batch = 200;
          AtomicBoolean done = new AtomicBoolean(false);
          while (!done.get()) {
             // connection per batch attempt
@@ -242,42 +680,53 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
             session = exConn.createSession(true, Session.SESSION_TRANSACTED);
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
-            TextMessage messageReceived = null;
+            TextMessage messageReceived;
+            int received = 0;
             for (int j = 0; j < batch; j++) {
                messageReceived = (TextMessage) messageConsumer.receive(2000);
                if (messageReceived == null) {
                   done.set(true);
                   break;
                }
+               received++;
                batchConsumed.incrementAndGet();
                assertEquals("This is a text message", 
messageReceived.getText());
+
+               int receivedSeq = messageReceived.getIntProperty("SEQ");
+               // need to infer batch from seq number and adjust - client 
never gets commit response
+               Assert.assertEquals("@:" + received + ", out of order", (batch 
* (receivedSeq / batch)) + j, receivedSeq);
             }
 
             // arrange concurrent commit - ack/commit
             // with server side error, potential for ack/commit and 
close-on-fail to contend
             final CountDownLatch latch = new CountDownLatch(1);
             Session finalSession = session;
-            executorService.submit(new Runnable() {
-               @Override
-               public void run() {
-                  try {
-                     latch.countDown();
-                     finalSession.commit();
+            executorService.submit(() -> {
+               try {
+                  latch.countDown();
+                  finalSession.commit();
 
-                  } catch (JMSException e) {
-                  }
+               } catch (JMSException ignored) {
                }
             });
 
             latch.await(1, TimeUnit.SECONDS);
             // force a local socket close such that the broker sees an 
exception on the connection and fails the consumer via serverConsumer close
-            ((FailoverTransport) ((org.apache.activemq.ActiveMQConnection) 
exConn).getTransport().narrow(FailoverTransport.class)).stop();
-            exConn.close();
+            ((org.apache.activemq.ActiveMQConnection) 
exConn).getTransport().narrow(FailoverTransport.class).stop();
+            // retry asap, not waiting for client close
+            final Connection finalConToClose = exConn;
+            executorService.submit(() -> {
+               try {
+                  finalConToClose.close();
+               } catch (JMSException ignored) {
+               }
+            });
          }
       } finally {
          if (exConn != null) {
             exConn.close();
          }
+         executorService.shutdownNow();
       }
 
       logger.info("Done after: {}, queue: {}", batchConsumed.get(), 
server.locateQueue(durableQueue));
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index cbdda46d69..82a2229411 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -413,7 +413,7 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest 
{
 
       connection.getPrefetchPolicy().setAll(prefetchSize);
       connection.start();
-      Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+      Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
 
       ActiveMQQueue destination = new ActiveMQQueue("TEST");
       this.makeSureCoreQueueExist("TEST");
@@ -433,13 +433,13 @@ public class RedeliveryPolicyTest extends 
BasicOpenWireTest {
 
       for (int i = 0; i < messageCount; i++) {
          m = consumer.receive(2000);
-         assertNotNull(m);
+         assertNotNull("null@:" + i, m);
          if (i == 3) {
             session.rollback();
             continue;
          }
          session.commit();
-         assertTrue(queueControl.getDeliveringCount() <= prefetchSize);
+         assertTrue(queueControl.getDeliveringCount() <= prefetchSize + 1);
       }
 
       m = consumer.receive(2000);
@@ -448,6 +448,52 @@ public class RedeliveryPolicyTest extends 
BasicOpenWireTest {
 
    }
 
+   /**
+    * @throws Exception
+    */
+   @Test
+   public void testCanRollbackPastPrefetch() throws Exception {
+      final int prefetchSize = 10;
+      final int messageCount = 2 * prefetchSize;
+
+      connection.getPrefetchPolicy().setAll(prefetchSize);
+      connection.getRedeliveryPolicy().setMaximumRedeliveries(prefetchSize + 
1);
+      connection.start();
+      Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+
+      ActiveMQQueue destination = new ActiveMQQueue("TEST");
+      this.makeSureCoreQueueExist("TEST");
+
+      QueueControl queueControl = (QueueControl)server.getManagementService().
+         getResource(ResourceNames.QUEUE + "TEST");
+
+      MessageProducer producer = session.createProducer(destination);
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(session.createTextMessage("MSG" + i));
+         session.commit();
+      }
+
+      Message m;
+      MessageConsumer consumer = session.createConsumer(destination);
+      Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 
3000, 100);
+
+      // do prefetch num rollbacks
+      for (int i = 0; i < prefetchSize; i++) {
+         m = consumer.receive(2000);
+         assertNotNull("null@:" + i, m);
+         session.rollback();
+      }
+
+      // then try and consume
+      for (int i = 0; i < messageCount; i++) {
+         m = consumer.receive(2000);
+         assertNotNull("null@:" + i, m);
+         session.commit();
+
+         assertTrue("deliveryCount: " + queueControl.getDeliveringCount() + " 
@:" + i, queueControl.getDeliveringCount() <= prefetchSize + 1);
+      }
+   }
+
    /**
     * @throws Exception
     */

Reply via email to