This is an automated email from the ASF dual-hosted git repository. gtully pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 8f9a72d257 ARTEMIS-4432 respect actor and operation context for openwire connection failure processing 8f9a72d257 is described below commit 8f9a72d257f6c75810a8d354006e1a6f015bf082 Author: Gary Tully <gary.tu...@gmail.com> AuthorDate: Mon Sep 18 16:16:43 2023 +0100 ARTEMIS-4432 respect actor and operation context for openwire connection failure processing --- .../artemis/utils/actors/ThresholdActor.java | 16 +++ .../artemis/utils/actors/ThresholdActorTest.java | 47 ++++++++ .../core/protocol/openwire/OpenWireConnection.java | 32 ++++- .../PrefetchRedeliveryCountOpenwireTest.java | 134 ++++++++++++++++++++- 4 files changed, 226 insertions(+), 3 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java index e0ff665173..4e7bbb6ded 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java @@ -41,6 +41,7 @@ public class ThresholdActor<T> extends ProcessorBase<Object> { private final ActorListener<T> listener; private final Runnable overThreshold; private final Runnable clearThreshold; + private volatile Runnable shutdownTask; public ThresholdActor(Executor parent, ActorListener<T> listener, int maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable clearThreshold) { super(parent); @@ -53,6 +54,10 @@ public class ThresholdActor<T> extends ProcessorBase<Object> { @Override protected final void doTask(Object task) { + if (task == shutdownTask) { + shutdownTask.run(); + return; + } if (task == FLUSH) { clearThreshold.run(); // should set to 0 no matter the value. There's a single thread setting this value back to zero @@ -94,4 +99,15 @@ public class ThresholdActor<T> extends ProcessorBase<Object> { task(FLUSH); } } + + public void shutdown(Runnable runnable) { + // do no more pending work + tasks.clear(); + // run this task next + shutdownTask = runnable; + tasks.add(runnable); + // wait for shutdown task to complete + flush(); + shutdown(); + } } diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java index 85e6ead043..e554d632f7 100644 --- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java @@ -184,4 +184,51 @@ public class ThresholdActorTest { } + @Test + public void testShutdownTask() throws Exception { + AtomicInteger lastAcquireFailed = new AtomicInteger(0); + lastProcessed.set(0); + + Semaphore allowedTasks = new Semaphore(10); + CountDownLatch completedTasks = new CountDownLatch(11); + CountDownLatch pendingTasks = new CountDownLatch(11); + + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + + ThresholdActor<Integer> actor = new ThresholdActor<>(executorService, (i) -> { + try { + pendingTasks.countDown(); + if (allowedTasks.tryAcquire(1, 200, TimeUnit.MILLISECONDS)) { + lastProcessed.set(i); + } else { + lastAcquireFailed.set(i); + } + completedTasks.countDown(); + } catch (InterruptedException ignored) { + } + + }, 1000, (e) -> { + return 1; + }, () -> { + }, () -> { + }); + + // expect allowedTasks tasks to complete + for (int i = 1; i < 100; i++) { + actor.act(i); + } + // wait for task processing + Assert.assertTrue(pendingTasks.await(4, TimeUnit.SECONDS)); + + actor.shutdown(() -> { + lastProcessed.set(lastProcessed.get() * 1000); + }); + + Assert.assertTrue(completedTasks.await(4, TimeUnit.SECONDS)); + + // assert processing terminated at block point + Assert.assertEquals(10000, lastProcessed.get()); + // pending task executed as expected + Assert.assertEquals(11, lastAcquireFailed.get()); + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index dc35b6b1ea..0956c2adca 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -681,6 +681,24 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return this.inWireFormat; } + private void rollbackInProgressLocalTransactions() { + + for (Transaction tx : txMap.values()) { + AMQSession session = (AMQSession) tx.getProtocolData(); + if (session != null) { + session.getCoreSession().resetTX(tx); + try { + session.getCoreSession().rollback(false); + } catch (Exception expectedOnExistingOutcome) { + } finally { + session.getCoreSession().resetTX(null); + } + } else { + tx.tryRollback(); + } + } + } + private void shutdown(boolean fail) { try { @@ -754,9 +772,19 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public void fail(ActiveMQException me, String message) { - for (Transaction tx : txMap.values()) { - tx.tryRollback(); + final ThresholdActor<Command> localVisibleActor = openWireActor; + if (localVisibleActor != null) { + localVisibleActor.shutdown(() -> doFail(me, message)); + } else { + doFail(me, message); } + } + + private void doFail(ActiveMQException me, String message) { + + recoverOperationContext(); + + rollbackInProgressLocalTransactions(); if (me != null) { //filter it like the other protocols diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java index 9d086a1f54..80d5ca05df 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java @@ -17,28 +17,44 @@ package org.apache.activemq.artemis.tests.integration.openwire; import javax.jms.Connection; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import java.io.PrintStream; +import java.lang.invoke.MethodHandles; import java.util.Map; - +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.RedeliveryPolicy; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.cli.commands.tools.PrintData; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.transport.failover.FailoverTransport; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + @Override public void setUp() throws Exception { realStore = true; @@ -176,4 +192,120 @@ public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase { } } } + + @Test(timeout = 60_000) + public void testExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws Exception { + Connection exConn = null; + + SimpleString durableQueue = new SimpleString("exampleQueueTwo"); + this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true)); + AtomicInteger batchConsumed = new AtomicInteger(0); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + exFact.setWatchTopicAdvisories(false); + + RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + redeliveryPolicy.setMaximumRedeliveries(4000); + exFact.setRedeliveryPolicy(redeliveryPolicy); + + Queue queue = new ActiveMQQueue("exampleQueueTwo"); + + exConn = exFact.createConnection(); + + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("This is a text message"); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + int numMessages = 600; + for (int i = 0; i < numMessages; i++) { + message.setIntProperty("SEQ", i); + producer.send(message); + } + session.close(); + exConn.close(); + + final int batch = numMessages; + AtomicInteger commits = new AtomicInteger(0); + AtomicBoolean done = new AtomicBoolean(false); + while (!done.get()) { + // connection per batch attempt + exConn = exFact.createConnection(); + ((ActiveMQConnection) exConn).setCloseTimeout(1); // so rollback on close won't block after socket close exception + + exConn.start(); + + session = exConn.createSession(true, Session.SESSION_TRANSACTED); + + MessageConsumer messageConsumer = session.createConsumer(queue); + TextMessage messageReceived = null; + for (int j = 0; j < batch; j++) { + messageReceived = (TextMessage) messageConsumer.receive(2000); + if (messageReceived == null) { + done.set(true); + break; + } + batchConsumed.incrementAndGet(); + assertEquals("This is a text message", messageReceived.getText()); + } + + // arrange concurrent commit - ack/commit + // with server side error, potential for ack/commit and close-on-fail to contend + final CountDownLatch latch = new CountDownLatch(1); + Session finalSession = session; + executorService.submit(new Runnable() { + @Override + public void run() { + try { + latch.countDown(); + finalSession.commit(); + commits.incrementAndGet(); + + } catch (JMSException e) { + } + } + }); + + latch.await(1, TimeUnit.SECONDS); + // force a local socket close such that the broker sees an exception on the connection and fails the consumer via serverConsumer close + ((FailoverTransport) ((org.apache.activemq.ActiveMQConnection) exConn).getTransport().narrow(FailoverTransport.class)).stop(); + exConn.close(); + } + } finally { + if (exConn != null) { + exConn.close(); + } + } + + logger.info("Done after: {}, queue: {}", batchConsumed.get(), server.locateQueue(durableQueue)); + try { + Wait.assertEquals(0L, () -> server.locateQueue(durableQueue).getDeliveringCount(), 1000); + } catch (Throwable e) { + + final AtomicBoolean doOut = new AtomicBoolean(false); + PrintStream out = new PrintStream(System.out) { + + @Override + public void println(String s) { + if (doOut.get()) { + super.println(s); + } else { + if (s.startsWith("### Failed Transactions")) { + doOut.set(true); + super.println(s); + } + } + } + }; + PrintData.printData(server.getConfiguration().getBindingsLocation(),server.getConfiguration().getJournalLocation(),server.getConfiguration().getPagingLocation(), out, true, true, true, false, -1); + + throw e; + } + } + }