This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 91fd12ad1f ARTEMIS-4480: rationalise openwire session tx usage and
operation context usage, use completion callbacks to ensure exclusive consumers
are isolated
91fd12ad1f is described below
commit 91fd12ad1f1723e4fa536d4edd9548cac2275be2
Author: Gary Tully <[email protected]>
AuthorDate: Tue Oct 31 15:53:09 2023 +0000
ARTEMIS-4480: rationalise openwire session tx usage and operation context
usage, use completion callbacks to ensure exclusive consumers are isolated
---
.../core/protocol/openwire/OpenWireConnection.java | 121 +++--
.../core/protocol/openwire/amq/AMQSession.java | 8 +
.../artemis/core/persistence/OperationContext.java | 2 +-
.../impl/journal/OperationContextImpl.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 27 +-
.../core/server/impl/ServerSessionImpl.java | 29 +-
.../core/transaction/impl/TransactionImpl.java | 3 +-
.../impl/journal/OperationContextUnitTest.java | 41 ++
.../PrefetchRedeliveryCountOpenwireTest.java | 487 ++++++++++++++++++++-
.../openwire/amq/RedeliveryPolicyTest.java | 52 ++-
10 files changed, 691 insertions(+), 81 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 77a095ae63..ee895a51a1 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -682,18 +682,7 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
private void rollbackInProgressLocalTransactions() {
for (Transaction tx : txMap.values()) {
- AMQSession session = (AMQSession) tx.getProtocolData();
- if (session != null) {
- session.getCoreSession().resetTX(tx);
- try {
- session.getCoreSession().rollback(false);
- } catch (Exception expectedOnExistingOutcome) {
- } finally {
- session.getCoreSession().resetTX(null);
- }
- } else {
- tx.tryRollback();
- }
+ tx.tryRollback();
}
}
@@ -729,7 +718,7 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
for (SessionId sessionId : sessionIdMap.values()) {
AMQSession session = sessions.get(sessionId);
if (session != null) {
- session.close();
+ session.close(fail);
}
}
internalSession.close(false);
@@ -782,8 +771,6 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
recoverOperationContext();
- rollbackInProgressLocalTransactions();
-
if (me != null) {
//filter it like the other protocols
if (!(me instanceof ActiveMQRemoteDisconnectException)) {
@@ -796,6 +783,20 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
}
} catch (InvalidClientIDException e) {
logger.warn("Couldn't close connection because invalid clientID", e);
+ } finally {
+ // there may be some transactions not associated with sessions
+ // deal with them after sessions are removed via connection removal
+ operationContext.executeOnCompletion(new IOCallback() {
+ @Override
+ public void done() {
+ rollbackInProgressLocalTransactions();
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ rollbackInProgressLocalTransactions();
+ }
+ });
}
shutdown(true);
}
@@ -1338,7 +1339,11 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
@Override
public Response processRollbackTransaction(TransactionInfo info) throws
Exception {
- Transaction tx = lookupTX(info.getTransactionId(), null, true);
+ Transaction tx = lookupTX(info.getTransactionId(), null);
+
+ if (tx == null) {
+ throw new IllegalStateException("Transaction not started, " +
info.getTransactionId());
+ }
final AMQSession amqSession;
if (tx != null) {
@@ -1354,11 +1359,12 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
if (amqSession != null) {
amqSession.getCoreSession().resetTX(tx);
- try {
- returnReferences(tx, amqSession);
- } finally {
- amqSession.getCoreSession().resetTX(null);
- }
+ tx.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void beforeRollback(Transaction tx) throws Exception {
+ returnReferences(tx, amqSession);
+ }
+ });
}
}
@@ -1406,6 +1412,9 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
}
} else {
if (tx != null) {
+ // returnReferences() is only interested in acked messages
already in the tx, hence we bypass
+ // getCoreSession().rollback() logic for CORE which would add
any prefetched/delivered which have
+ // not yet been processed by the openwire client.
tx.rollback();
}
}
@@ -1490,18 +1499,46 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
@Override
public Response processBeginTransaction(TransactionInfo info) throws
Exception {
final TransactionId txID = info.getTransactionId();
-
try {
internalSession.resetTX(null);
if (txID.isXATransaction()) {
- Xid xid = OpenWireUtil.toXID(txID);
+ final Xid xid = OpenWireUtil.toXID(txID);
internalSession.xaStart(xid);
+ final ResourceManager resourceManager =
server.getResourceManager();
+ final Transaction transaction =
resourceManager.getTransaction(xid);
+ transaction.addOperation(new TransactionOperationAbstract() {
+ @Override
+ public void afterCommit(Transaction tx) {
+ removeFromResourceManager();
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ removeFromResourceManager();
+ }
+
+ private void removeFromResourceManager() {
+ try {
+ resourceManager.removeTransaction(xid,
getRemotingConnection());
+ } catch (ActiveMQException bestEffort) {
+ }
+ }
+ });
} else {
- Transaction transaction = internalSession.newTransaction();
+ final Transaction transaction =
internalSession.newTransaction();
txMap.put(txID, transaction);
transaction.addOperation(new TransactionOperationAbstract() {
@Override
public void afterCommit(Transaction tx) {
+ removeFromTxMap();
+ }
+
+ @Override
+ public void afterRollback(Transaction tx) {
+ removeFromTxMap();
+ }
+
+ private void removeFromTxMap() {
txMap.remove(txID);
}
});
@@ -1520,7 +1557,11 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
private Response processCommit(TransactionInfo info, boolean onePhase)
throws Exception {
TransactionId txID = info.getTransactionId();
- Transaction tx = lookupTX(txID, null, true);
+ Transaction tx = lookupTX(txID, null);
+
+ if (tx == null) {
+ throw new IllegalStateException("Transaction not started, " +
txID);
+ }
if (txID.isXATransaction()) {
ResourceManager resourceManager = server.getResourceManager();
@@ -1557,7 +1598,13 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
}
} else {
if (tx != null) {
- tx.commit(onePhase);
+ AMQSession amqSession = (AMQSession) tx.getProtocolData();
+ if (amqSession != null) {
+ amqSession.getCoreSession().resetTX(tx);
+ amqSession.getCoreSession().commit();
+ } else {
+ tx.commit(true);
+ }
}
}
@@ -1630,8 +1677,6 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
logger.warn("Error during method invocation", e);
throw e;
}
- } else {
- txMap.remove(txID);
}
return null;
@@ -1705,8 +1750,6 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
sendException(e);
}
throw e;
- } finally {
- session.getCoreSession().resetTX(null);
}
return null;
@@ -1716,6 +1759,11 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
public Response processMessageAck(MessageAck ack) throws Exception {
AMQSession session = getSession(ack.getConsumerId().getParentId());
Transaction tx = lookupTX(ack.getTransactionId(), session);
+
+ if (ack.getTransactionId() != null && tx == null) {
+ throw new IllegalStateException("Transaction not started, " +
ack.getTransactionId());
+ }
+
session.getCoreSession().resetTX(tx);
try {
@@ -1725,8 +1773,6 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
}
- } finally {
- session.getCoreSession().resetTX(null);
}
return null;
}
@@ -1819,21 +1865,16 @@ public class OpenWireConnection extends
AbstractRemotingConnection implements Se
}
private Transaction lookupTX(TransactionId txID, AMQSession session) throws
Exception {
- return lookupTX(txID, session, false);
- }
-
- private Transaction lookupTX(TransactionId txID, AMQSession session,
boolean remove) throws Exception {
if (txID == null) {
return null;
}
- Xid xid = null;
Transaction transaction;
if (txID.isXATransaction()) {
- xid = OpenWireUtil.toXID(txID);
- transaction = remove ?
server.getResourceManager().removeTransaction(xid, this) :
server.getResourceManager().getTransaction(xid);
+ final Xid xid = OpenWireUtil.toXID(txID);
+ transaction = server.getResourceManager().getTransaction(xid);
} else {
- transaction = remove ? txMap.remove(txID) : txMap.get(txID);
+ transaction = txMap.get(txID);
}
if (transaction == null) {
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 9acfd5dd7a..2e582a8bbb 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -547,6 +547,14 @@ public class AMQSession implements SessionCallback {
this.coreSession.close(false);
}
+ @Override
+ public void close(boolean failed) {
+ try {
+ this.coreSession.close(failed);
+ } catch (Exception bestEffort) {
+ }
+ }
+
public OpenWireConnection getConnection() {
return connection;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
index 57ba7eca7a..dd7276fdfc 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/OperationContext.java
@@ -62,7 +62,7 @@ public interface OperationContext extends IOCompletion {
*/
boolean waitCompletion(long timeout) throws Exception;
- default void clear() {
+ default void reset() {
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
index ceab9c0c1f..a56f90a012 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextImpl.java
@@ -449,7 +449,7 @@ public class OperationContextImpl implements
OperationContext {
}
@Override
- public synchronized void clear() {
+ public synchronized void reset() {
stored = 0;
storeLineUpField = 0;
minimalReplicated = 0;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7f7cae750e..7b735e0daa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1534,7 +1534,23 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
if (consumer == exclusiveConsumer) {
- exclusiveConsumer = null;
+
+ // await context completion such that any delivered are returned
+ // to the queue. Preserve an ordered view for the next exclusive
+ // consumer
+ storageManager.afterCompleteOperations(new IOCallback() {
+
+ @Override
+ public void onError(final int errorCode, final String
errorMessage) {
+ releaseExclusiveConsumer();
+ }
+
+ @Override
+ public void done() {
+ releaseExclusiveConsumer();
+ }
+
+ });
}
groups.removeIf(consumer::equals);
@@ -1543,6 +1559,14 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
}
}
+ private void releaseExclusiveConsumer() {
+ synchronized (QueueImpl.this) {
+ exclusiveConsumer = null;
+ resetAllIterators();
+ }
+ deliverAsync();
+ }
+
private void stopDispatch() {
boolean stopped = dispatchingUpdater.compareAndSet(this,
BooleanUtil.toInt(true), BooleanUtil.toInt(false));
if (stopped) {
@@ -3136,7 +3160,6 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (holder == null) {
// this shouldn't happen, however I'm adding this check just
in case
logger.debug("consumers.next() returned null.");
- consumers.remove();
deliverAsync(true);
return false;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 73050766f7..41c25d25db 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -142,7 +142,7 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
protected final ServerProducers serverProducers;
- protected Transaction tx;
+ protected volatile Transaction tx;
/** This will store the Transaction between xaEnd and xaPrepare or xaCommit.
* in a failure scenario (client is gone), this will be held between xaEnd
and xaCommit. */
@@ -395,15 +395,10 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
callback.close(failed);
}
synchronized (this) {
- if (!closed) {
- if (server.hasBrokerSessionPlugins()) {
- server.callBrokerSessionPlugins(plugin ->
plugin.beforeCloseSession(this, failed));
- }
+ if (server.hasBrokerSessionPlugins()) {
+ server.callBrokerSessionPlugins(plugin ->
plugin.beforeCloseSession(this, failed));
}
this.setStarted(false);
- if (closed)
- return;
-
if (failed) {
Transaction txToRollback = tx;
@@ -432,7 +427,6 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
}
}
}
- closed = true;
}
//putting closing of consumers outside the sync block
@@ -653,7 +647,7 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
* Notice that we set autoCommitACK and autoCommitSends to true if tx ==
null
*/
@Override
- public void resetTX(Transaction transaction) {
+ public synchronized void resetTX(Transaction transaction) {
this.tx = transaction;
this.autoCommitAcks = transaction == null;
this.autoCommitSends = transaction == null;
@@ -1724,20 +1718,29 @@ public class ServerSessionImpl implements
ServerSession, FailureListener {
@Override
public void close(final boolean failed, final boolean force) {
- if (closed)
- return;
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
if (force) {
- context.clear();
+ context.reset();
}
context.executeOnCompletion(new IOCallback() {
@Override
public void onError(int errorCode, String errorMessage) {
+ callDoClose();
}
@Override
public void done() {
+ callDoClose();
+ }
+
+ private void callDoClose() {
try {
doClose(failed);
} catch (Exception e) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
index 5256155f41..f619c289f5 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImpl.java
@@ -382,8 +382,7 @@ public class TransactionImpl implements Transaction {
// We will like to execute afterRollback and clear anything pending
ActiveMQServerLogger.LOGGER.failedToPerformRollback(e);
}
- // We want to make sure that nothing else gets done after the commit is
issued
- // this will eliminate any possibility or races
+ // We want to make sure that nothing else gets done after the rollback
is issued
final List<TransactionOperation> operationsToComplete = this.operations;
this.operations = null;
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
index 6e95d6a3b3..5b81a8153a 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/OperationContextUnitTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.persistence.impl.journal;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -27,6 +28,7 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.junit.Assert;
import org.junit.Test;
@@ -247,6 +249,45 @@ public class OperationContextUnitTest extends
ActiveMQTestBase {
}
}
+ @Test
+ public void testSequentialCompletionN() throws Exception {
+ ExecutorService executor =
Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory(getClass().getName()));
+ ConcurrentLinkedQueue<Long> completions = new ConcurrentLinkedQueue();
+ final int N = 500;
+ try {
+ final OperationContextImpl impl = new OperationContextImpl(new
OrderedExecutor(executor));
+
+ // pending work to queue completions till done
+ impl.storeLineUp();
+
+ for (long l = 0; l < N; l++) {
+ long finalL = l;
+ impl.executeOnCompletion(new IOCallback() {
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ }
+
+ @Override
+ public void done() {
+ completions.add(finalL);
+ }
+ });
+ }
+
+ impl.done();
+
+ Wait.assertEquals(N, ()-> completions.size());
+
+ for (long i = 0; i < N; i++) {
+ assertEquals("ordered", i, (long) completions.poll());
+ }
+
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
+
@Test
public void testErrorNotLostOnPageSyncError() throws Exception {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
index 1dc6105541..c1a563cdd4 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -26,6 +27,8 @@ import javax.jms.TextMessage;
import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -36,16 +39,25 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
+import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.LocalTransactionId;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.TransactionId;
+import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.failover.FailoverTransport;
+import org.apache.activemq.transport.tcp.TcpTransport;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -67,7 +79,14 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
// force send to dlq early
addressSettingsMap.put("exampleQueue", new
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
// force send to dlq late
- addressSettingsMap.put("exampleQueueTwo", new
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(4000));
+ addressSettingsMap.put("exampleQueueTwo", new
AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new
SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(-1));
+ }
+
+ @Override
+ protected void extraServerConfig(Configuration serverConfig) {
+ // useful to debug if there is some contention that causes a concurrent
rollback,
+ // when true, the rollback journal update uses the operation context and
the failure propagates
+ //serverConfig.setJournalSyncTransactional(true);
}
@Test(timeout = 60_000)
@@ -155,7 +174,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
TextMessage message = session.createTextMessage("This is a text
message");
- int numMessages = 2000;
+ int numMessages = 10000;
for (int i = 0; i < numMessages; i++) {
message.setIntProperty("SEQ", i);
producer.send(message);
@@ -163,7 +182,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
session.commit();
exConn.close();
- final int batch = 100;
+ final int batch = 200;
for (int i = 0; i < numMessages; i += batch) {
// connection per batch
exConn = exFact.createConnection();
@@ -172,7 +191,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
session = exConn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(queue);
- TextMessage messageReceived = null;
+ TextMessage messageReceived;
for (int j = 0; j < batch; j++) { // a small batch
messageReceived = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull("null @ i=" + i, messageReceived);
@@ -183,7 +202,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
session.commit();
// force a local socket close such that the broker sees an
exception on the connection and fails the consumer via close
-
((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
+
((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class).stop();
exConn.close();
}
} finally {
@@ -194,9 +213,429 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
}
@Test(timeout = 60_000)
+ public void testServerSideRollbackOnCloseOrder() throws Exception {
+
+ final ArrayList<Throwable> errors = new ArrayList<>();
+ SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+ this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+
+ Queue queue = new ActiveMQQueue(durableQueue.toString());
+
+ final ActiveMQConnectionFactory exFact = new
ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=10&maxReconnectAttempts=0&timeout=1000");
+ exFact.setWatchTopicAdvisories(false);
+ exFact.setConnectResponseTimeout(10000);
+
+ ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new
ActiveMQPrefetchPolicy();
+ prefetchPastMaxDeliveriesInLoop.setAll(2000);
+ exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
+
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.setRedeliveryDelay(0);
+ redeliveryPolicy.setMaximumRedeliveries(-1);
+ exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+ Connection exConn = exFact.createConnection();
+ exConn.start();
+
+ Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("This is a text
message");
+
+ int numMessages = 1000;
+
+ for (int i = 0; i < numMessages; i++) {
+ message.setIntProperty("SEQ", i);
+ producer.send(message);
+ }
+ session.commit();
+ exConn.close();
+
+ final int numConsumers = 2;
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ // consume under load
+ final int numLoadProducers = 2;
+ underLoad(numLoadProducers, ()-> {
+
+ // a bunch of concurrent batch consumers, expecting order
+ AtomicBoolean done = new AtomicBoolean(false);
+ AtomicInteger receivedCount = new AtomicInteger();
+ AtomicInteger inProgressBatch = new AtomicInteger();
+
+ final int batch = 100;
+
+ final ExecutorService commitExecutor =
Executors.newCachedThreadPool();
+
+ Runnable consumerTask = () -> {
+
+ Connection toCloseOnError = null;
+ while (!done.get() && receivedCount.get() < 20 * numMessages) {
+ try (Connection consumerConnection = exFact.createConnection())
{
+
+ toCloseOnError = consumerConnection;
+ ((ActiveMQConnection)
consumerConnection).setCloseTimeout(1); // so rollback on close won't block
after socket close exception
+
+ consumerConnection.start();
+
+ Session consumerConnectionSession =
consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer =
consumerConnectionSession.createConsumer(queue);
+ TextMessage messageReceived = null;
+
+ int i = 0;
+ for (; i < batch; i++) {
+ messageReceived = (TextMessage)
messageConsumer.receive(2000);
+ if (messageReceived == null) {
+ break;
+ }
+
+ receivedCount.incrementAndGet();
+ // need to infer batch from seq number and adjust -
client never gets commit response
+ int receivedSeq = messageReceived.getIntProperty("SEQ");
+ int currentBatch = (receivedSeq / batch);
+ if (i == 0) {
+ if (inProgressBatch.get() != currentBatch) {
+ if (inProgressBatch.get() + 1 == currentBatch) {
+ inProgressBatch.incrementAndGet(); // all good,
next batch
+ logger.info("@:" + receivedCount.get() + ",
current batch increment to: " + inProgressBatch.get() + ", Received Seq: " +
receivedSeq + ", Message: " + messageReceived);
+ } else {
+ // we have an order problem
+ done.set(true);
+ throw new AssertionError("@:" +
receivedCount.get() + ", batch out of sequence, expected: " +
(inProgressBatch.get() + 1) + ", but have: " + currentBatch + " @" +
receivedSeq + ", Message: " + messageReceived);
+ }
+ }
+ }
+ // verify within batch order
+ Assert.assertEquals("@:" + receivedCount.get() + " batch
out of order", ((long) batch * inProgressBatch.get()) + i, receivedSeq);
+ }
+
+ if (i != batch) {
+ continue;
+ }
+
+ // manual ack in tx to setup server for rollback work on fail
+ Transport transport = ((ActiveMQConnection)
consumerConnection).getTransport();
+ TransactionId txId = new
LocalTransactionId(((ActiveMQConnection)
consumerConnection).getConnectionInfo().getConnectionId(), receivedCount.get());
+ TransactionInfo tx = new
TransactionInfo(((ActiveMQConnection)
consumerConnection).getConnectionInfo().getConnectionId(), txId,
TransactionInfo.BEGIN);
+ transport.request(tx);
+ MessageAck ack = new MessageAck();
+ ActiveMQMessage mqMessage = (ActiveMQMessage)
messageReceived;
+ ack.setDestination(mqMessage.getDestination());
+ ack.setMessageID(mqMessage.getMessageId());
+ ack.setMessageCount(batch);
+ ack.setTransactionId(tx.getTransactionId());
+
ack.setConsumerId(((ActiveMQMessageConsumer)messageConsumer).getConsumerId());
+
+ transport.request(ack);
+
+ try {
+ // force a local socket close such that the broker sees
an exception on the connection and fails the consumer via serverConsumer close
+ ((ActiveMQConnection)
consumerConnection).getTransport().narrow(TcpTransport.class).stop();
+ } catch (Throwable expected) {
+ }
+
+ } catch (ConcurrentModificationException | NullPointerException
ignored) {
+ } catch (JMSException ignored) {
+ // expected on executor stop
+ } catch (Throwable unexpected) {
+ unexpected.printStackTrace();
+ errors.add(unexpected);
+ done.set(true);
+ } finally {
+ if (toCloseOnError != null) {
+ try {
+ toCloseOnError.close();
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+ };
+
+ for (int i = 0; i < numConsumers; i++) {
+ executorService.submit(consumerTask);
+ }
+ executorService.shutdown();
+
+
+ try {
+ assertTrue(executorService.awaitTermination(30, TimeUnit.SECONDS));
+ assertTrue(errors.isEmpty());
+ } catch (Throwable t) {
+ errors.add(t);
+ } finally {
+ done.set(true);
+ commitExecutor.shutdownNow();
+ executorService.shutdownNow();
+ }
+
+ Assert.assertTrue("errors: " + errors, errors.isEmpty());
+ });
+
+ Assert.assertTrue(errors.isEmpty());
+ }
+
+ @Test(timeout = 60_000)
+ public void testExclusiveConsumerBatchOrderUnderLoad() throws Exception {
+
+ final ArrayList<Throwable> errors = new ArrayList<>();
+ SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+ this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+
+ Queue queue = new ActiveMQQueue(durableQueue.toString());
+
+ final ActiveMQConnectionFactory exFact = new
ActiveMQConnectionFactory("failover:(tcp://localhost:61616?closeAsync=false)?startupMaxReconnectAttempts=10&maxReconnectAttempts=0&timeout=1000");
+ exFact.setWatchTopicAdvisories(false);
+ exFact.setConnectResponseTimeout(10000);
+
+ ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new
ActiveMQPrefetchPolicy();
+ prefetchPastMaxDeliveriesInLoop.setAll(2000);
+ exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);
+
+ RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ redeliveryPolicy.setRedeliveryDelay(0);
+ redeliveryPolicy.setMaximumRedeliveries(-1);
+ exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+ Connection exConn = exFact.createConnection();
+ exConn.start();
+
+ Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("This is a text
message");
+
+ int numMessages = 10000;
+
+ for (int i = 0; i < numMessages; i++) {
+ message.setIntProperty("SEQ", i);
+ producer.send(message);
+ }
+ session.commit();
+ exConn.close();
+
+ final int numConsumers = 4;
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ // consume under load
+ final int numLoadProducers = 4;
+ underLoad(numLoadProducers, ()-> {
+
+ // a bunch of concurrent batch consumers, expecting order
+ AtomicBoolean done = new AtomicBoolean(false);
+ AtomicInteger receivedCount = new AtomicInteger();
+ AtomicInteger inProgressBatch = new AtomicInteger();
+
+ final int batch = 200;
+
+ final ExecutorService commitExecutor =
Executors.newCachedThreadPool();
+
+ Runnable consumerTask = () -> {
+
+ Connection toCloseOnError = null;
+ while (!done.get() &&
server.locateQueue(durableQueue).getMessageCount() > 0L) {
+ try (Connection consumerConnection = exFact.createConnection())
{
+
+ toCloseOnError = consumerConnection;
+ ((ActiveMQConnection)
consumerConnection).setCloseTimeout(1); // so rollback on close won't block
after socket close exception
+
+ consumerConnection.start();
+
+ Session consumerConnectionSession =
consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumer =
consumerConnectionSession.createConsumer(queue);
+ TextMessage messageReceived;
+
+ int i = 0;
+ for (; i < batch; i++) {
+ messageReceived = (TextMessage)
messageConsumer.receive(2000);
+ if (messageReceived == null) {
+ break;
+ }
+
+ receivedCount.incrementAndGet();
+ // need to infer batch from seq number and adjust -
client never gets commit response
+ int receivedSeq = messageReceived.getIntProperty("SEQ");
+ int currentBatch = (receivedSeq / batch);
+ if (i == 0) {
+ if (inProgressBatch.get() != currentBatch) {
+ if (inProgressBatch.get() + 1 == currentBatch) {
+ inProgressBatch.incrementAndGet(); // all good,
next batch
+ logger.info("@:" + receivedCount.get() + ",
current batch increment to: " + inProgressBatch.get() + ", Received Seq: " +
receivedSeq + ", Message: " + messageReceived);
+ } else {
+ // we have an order problem
+ done.set(true);
+ throw new AssertionError("@:" +
receivedCount.get() + ", batch out of sequence, expected: " +
(inProgressBatch.get() + 1) + ", but have: " + currentBatch + " @" +
receivedSeq + ", Message: " + messageReceived);
+ }
+ }
+ }
+ // verify within batch order
+ Assert.assertEquals("@:" + receivedCount.get() + " batch
out of order", ((long) batch * inProgressBatch.get()) + i, receivedSeq);
+ }
+
+ if (i != batch) {
+ continue;
+ }
+
+ // arrange concurrent commit - ack/commit of batch
+ // with server side error, potential for ack/commit and
close-on-fail to contend
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Session finalSession = consumerConnectionSession;
+ commitExecutor.submit(() -> {
+ try {
+ latch.countDown();
+ finalSession.commit();
+
+ } catch (Throwable expected) {
+ }
+ });
+
+ latch.await(1, TimeUnit.SECONDS);
+
+ // give a chance to have a batch complete to make progress!
+ TimeUnit.MILLISECONDS.sleep(15);
+
+ try {
+ // force a local socket close such that the broker sees
an exception on the connection and fails the consumer via serverConsumer close
+ ((ActiveMQConnection)
consumerConnection).getTransport().narrow(TcpTransport.class).stop();
+ } catch (Throwable expected) {
+ }
+
+ } catch (InterruptedException | ConcurrentModificationException
| NullPointerException ignored) {
+ } catch (JMSException ignored) {
+ // expected on executor stop
+ } catch (Throwable unexpected) {
+ unexpected.printStackTrace();
+ errors.add(unexpected);
+ done.set(true);
+ } finally {
+ if (toCloseOnError != null) {
+ try {
+ toCloseOnError.close();
+ } catch (Throwable ignored) {
+ }
+ }
+ }
+ }
+ };
+
+ for (int i = 0; i < numConsumers; i++) {
+ executorService.submit(consumerTask);
+ }
+ executorService.shutdown();
+
+ try {
+ Wait.assertEquals(0L, () -> {
+ if (!errors.isEmpty()) {
+ return -1;
+ }
+ return server.locateQueue(durableQueue).getMessageCount();
+ }, 30 * 1000);
+ } catch (Throwable t) {
+ errors.add(t);
+ } finally {
+ done.set(true);
+ commitExecutor.shutdownNow();
+ executorService.shutdownNow();
+ }
+
+ Assert.assertTrue(errors.isEmpty());
+ });
+
+ Assert.assertTrue(errors.isEmpty());
+ }
+
+ public void underLoad(final int numProducers, Runnable r) throws Exception {
+ // produce some load with a producer(s)/consumer
+ SimpleString durableQueue = new SimpleString("exampleQueue");
+ this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST));
+
+ ExecutorService executor = Executors.newFixedThreadPool(numProducers +
1);
+
+ Queue queue;
+ ConnectionFactory cf;
+ boolean useCoreForLoad = true;
+
+ if (useCoreForLoad) {
+ org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
connectionFactory = new
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
+ connectionFactory.setConfirmationWindowSize(1000000);
+ connectionFactory.setBlockOnDurableSend(true);
+ connectionFactory.setBlockOnNonDurableSend(true);
+ cf = connectionFactory;
+
+ queue =
connectionFactory.createContext().createQueue(durableQueue.toString());
+
+ } else {
+ ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?startupMaxReconnectAttempts=0&maxReconnectAttempts=0&timeout=1000");
+ connectionFactory.setWatchTopicAdvisories(false);
+ connectionFactory.setCloseTimeout(1);
+ connectionFactory.setSendTimeout(2000);
+
+ cf = connectionFactory;
+ queue = new ActiveMQQueue(durableQueue.toString());
+ }
+
+
+ final Queue destination = queue;
+ final ConnectionFactory connectionFactory = cf;
+ final AtomicBoolean done = new AtomicBoolean();
+ Runnable producerTask = ()-> {
+
+ try (Connection exConn = connectionFactory.createConnection()) {
+
+ exConn.start();
+
+ final Session session = exConn.createSession(true,
Session.SESSION_TRANSACTED);
+ final MessageProducer producer =
session.createProducer(destination);
+ final TextMessage message = session.createTextMessage("This is a
text message");
+
+ int count = 1;
+ while (!done.get()) {
+ producer.send(message);
+ if ((count++ % 100) == 0) {
+ session.commit();
+ }
+ }
+ } catch (Exception ignored) {
+ }
+ };
+
+ for (int i = 0; i < numProducers; i++) {
+ executor.submit(producerTask);
+ }
+ // one consumer
+ executor.submit(()-> {
+
+ try (Connection exConn = connectionFactory.createConnection()) {
+ exConn.start();
+
+ Session session = exConn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer =
session.createConsumer(destination);
+
+ while (!done.get()) {
+ messageConsumer.receive(200);
+ }
+ } catch (Exception ignored) {
+ }
+ });
+
+
+ try {
+ r.run();
+ } finally {
+ done.set(true);
+ executor.shutdown();
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ logger.info("LOAD ADDED: " +
server.locateQueue(durableQueue).getMessagesAdded());
+ }
+ }
+
+ @Test(timeout = 120_000)
public void
testExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws
Exception {
+ doTestExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch();
+ }
+
+ public void
doTestExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws
Exception {
Connection exConn = null;
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
SimpleString durableQueue = new SimpleString("exampleQueueTwo");
this.server.createQueue(new
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
AtomicInteger batchConsumed = new AtomicInteger(0);
@@ -221,8 +660,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
TextMessage message = session.createTextMessage("This is a text
message");
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- int numMessages = 600;
+ int numMessages = 1000;
for (int i = 0; i < numMessages; i++) {
message.setIntProperty("SEQ", i);
producer.send(message);
@@ -230,7 +668,7 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
session.close();
exConn.close();
- final int batch = numMessages;
+ final int batch = 200;
AtomicBoolean done = new AtomicBoolean(false);
while (!done.get()) {
// connection per batch attempt
@@ -242,42 +680,53 @@ public class PrefetchRedeliveryCountOpenwireTest extends
OpenWireTestBase {
session = exConn.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumer = session.createConsumer(queue);
- TextMessage messageReceived = null;
+ TextMessage messageReceived;
+ int received = 0;
for (int j = 0; j < batch; j++) {
messageReceived = (TextMessage) messageConsumer.receive(2000);
if (messageReceived == null) {
done.set(true);
break;
}
+ received++;
batchConsumed.incrementAndGet();
assertEquals("This is a text message",
messageReceived.getText());
+
+ int receivedSeq = messageReceived.getIntProperty("SEQ");
+ // need to infer batch from seq number and adjust - client
never gets commit response
+ Assert.assertEquals("@:" + received + ", out of order", (batch
* (receivedSeq / batch)) + j, receivedSeq);
}
// arrange concurrent commit - ack/commit
// with server side error, potential for ack/commit and
close-on-fail to contend
final CountDownLatch latch = new CountDownLatch(1);
Session finalSession = session;
- executorService.submit(new Runnable() {
- @Override
- public void run() {
- try {
- latch.countDown();
- finalSession.commit();
+ executorService.submit(() -> {
+ try {
+ latch.countDown();
+ finalSession.commit();
- } catch (JMSException e) {
- }
+ } catch (JMSException ignored) {
}
});
latch.await(1, TimeUnit.SECONDS);
// force a local socket close such that the broker sees an
exception on the connection and fails the consumer via serverConsumer close
- ((FailoverTransport) ((org.apache.activemq.ActiveMQConnection)
exConn).getTransport().narrow(FailoverTransport.class)).stop();
- exConn.close();
+ ((org.apache.activemq.ActiveMQConnection)
exConn).getTransport().narrow(FailoverTransport.class).stop();
+ // retry asap, not waiting for client close
+ final Connection finalConToClose = exConn;
+ executorService.submit(() -> {
+ try {
+ finalConToClose.close();
+ } catch (JMSException ignored) {
+ }
+ });
}
} finally {
if (exConn != null) {
exConn.close();
}
+ executorService.shutdownNow();
}
logger.info("Done after: {}, queue: {}", batchConsumed.get(),
server.locateQueue(durableQueue));
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
index cbdda46d69..82a2229411 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/RedeliveryPolicyTest.java
@@ -413,7 +413,7 @@ public class RedeliveryPolicyTest extends BasicOpenWireTest
{
connection.getPrefetchPolicy().setAll(prefetchSize);
connection.start();
- Session session = connection.createSession(true,
Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
ActiveMQQueue destination = new ActiveMQQueue("TEST");
this.makeSureCoreQueueExist("TEST");
@@ -433,13 +433,13 @@ public class RedeliveryPolicyTest extends
BasicOpenWireTest {
for (int i = 0; i < messageCount; i++) {
m = consumer.receive(2000);
- assertNotNull(m);
+ assertNotNull("null@:" + i, m);
if (i == 3) {
session.rollback();
continue;
}
session.commit();
- assertTrue(queueControl.getDeliveringCount() <= prefetchSize);
+ assertTrue(queueControl.getDeliveringCount() <= prefetchSize + 1);
}
m = consumer.receive(2000);
@@ -448,6 +448,52 @@ public class RedeliveryPolicyTest extends
BasicOpenWireTest {
}
+ /**
+ * @throws Exception
+ */
+ @Test
+ public void testCanRollbackPastPrefetch() throws Exception {
+ final int prefetchSize = 10;
+ final int messageCount = 2 * prefetchSize;
+
+ connection.getPrefetchPolicy().setAll(prefetchSize);
+ connection.getRedeliveryPolicy().setMaximumRedeliveries(prefetchSize +
1);
+ connection.start();
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+
+ ActiveMQQueue destination = new ActiveMQQueue("TEST");
+ this.makeSureCoreQueueExist("TEST");
+
+ QueueControl queueControl = (QueueControl)server.getManagementService().
+ getResource(ResourceNames.QUEUE + "TEST");
+
+ MessageProducer producer = session.createProducer(destination);
+ for (int i = 0; i < messageCount; i++) {
+ producer.send(session.createTextMessage("MSG" + i));
+ session.commit();
+ }
+
+ Message m;
+ MessageConsumer consumer = session.createConsumer(destination);
+ Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(),
3000, 100);
+
+ // do prefetch num rollbacks
+ for (int i = 0; i < prefetchSize; i++) {
+ m = consumer.receive(2000);
+ assertNotNull("null@:" + i, m);
+ session.rollback();
+ }
+
+ // then try and consume
+ for (int i = 0; i < messageCount; i++) {
+ m = consumer.receive(2000);
+ assertNotNull("null@:" + i, m);
+ session.commit();
+
+ assertTrue("deliveryCount: " + queueControl.getDeliveringCount() + "
@:" + i, queueControl.getDeliveringCount() <= prefetchSize + 1);
+ }
+ }
+
/**
* @throws Exception
*/