This is an automated email from the ASF dual-hosted git repository.

gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f9a72d257 ARTEMIS-4432 respect actor and operation context for 
openwire connection failure processing
8f9a72d257 is described below

commit 8f9a72d257f6c75810a8d354006e1a6f015bf082
Author: Gary Tully <gary.tu...@gmail.com>
AuthorDate: Mon Sep 18 16:16:43 2023 +0100

    ARTEMIS-4432 respect actor and operation context for openwire connection 
failure processing
---
 .../artemis/utils/actors/ThresholdActor.java       |  16 +++
 .../artemis/utils/actors/ThresholdActorTest.java   |  47 ++++++++
 .../core/protocol/openwire/OpenWireConnection.java |  32 ++++-
 .../PrefetchRedeliveryCountOpenwireTest.java       | 134 ++++++++++++++++++++-
 4 files changed, 226 insertions(+), 3 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
index e0ff665173..4e7bbb6ded 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/actors/ThresholdActor.java
@@ -41,6 +41,7 @@ public class ThresholdActor<T> extends ProcessorBase<Object> {
    private final ActorListener<T> listener;
    private final Runnable overThreshold;
    private final Runnable clearThreshold;
+   private volatile Runnable shutdownTask;
 
    public ThresholdActor(Executor parent, ActorListener<T> listener, int 
maxSize, ToIntFunction<T> sizeGetter, Runnable overThreshold, Runnable 
clearThreshold) {
       super(parent);
@@ -53,6 +54,10 @@ public class ThresholdActor<T> extends ProcessorBase<Object> 
{
 
    @Override
    protected final void doTask(Object task) {
+      if (task == shutdownTask) {
+         shutdownTask.run();
+         return;
+      }
       if (task == FLUSH) {
          clearThreshold.run();
          // should set to 0 no matter the value. There's a single thread 
setting this value back to zero
@@ -94,4 +99,15 @@ public class ThresholdActor<T> extends ProcessorBase<Object> 
{
          task(FLUSH);
       }
    }
+
+   public void shutdown(Runnable runnable) {
+      // do no more pending work
+      tasks.clear();
+      // run this task next
+      shutdownTask = runnable;
+      tasks.add(runnable);
+      // wait for shutdown task to complete
+      flush();
+      shutdown();
+   }
 }
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
index 85e6ead043..e554d632f7 100644
--- 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/actors/ThresholdActorTest.java
@@ -184,4 +184,51 @@ public class ThresholdActorTest {
    }
 
 
+   @Test
+   public void testShutdownTask() throws Exception {
+      AtomicInteger lastAcquireFailed = new AtomicInteger(0);
+      lastProcessed.set(0);
+
+      Semaphore allowedTasks = new Semaphore(10);
+      CountDownLatch completedTasks = new CountDownLatch(11);
+      CountDownLatch pendingTasks = new CountDownLatch(11);
+
+      final ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+
+      ThresholdActor<Integer> actor = new ThresholdActor<>(executorService, 
(i) -> {
+         try {
+            pendingTasks.countDown();
+            if (allowedTasks.tryAcquire(1, 200, TimeUnit.MILLISECONDS)) {
+               lastProcessed.set(i);
+            } else {
+               lastAcquireFailed.set(i);
+            }
+            completedTasks.countDown();
+         } catch (InterruptedException ignored) {
+         }
+
+      }, 1000, (e) -> {
+         return 1;
+      }, () -> {
+      }, () -> {
+      });
+
+      // expect allowedTasks tasks to complete
+      for (int i = 1; i < 100; i++) {
+         actor.act(i);
+      }
+      // wait for task processing
+      Assert.assertTrue(pendingTasks.await(4, TimeUnit.SECONDS));
+
+      actor.shutdown(() -> {
+         lastProcessed.set(lastProcessed.get() * 1000);
+      });
+
+      Assert.assertTrue(completedTasks.await(4, TimeUnit.SECONDS));
+
+      // assert processing terminated at block point
+      Assert.assertEquals(10000, lastProcessed.get());
+      // pending task executed as expected
+      Assert.assertEquals(11, lastAcquireFailed.get());
+   }
 }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index dc35b6b1ea..0956c2adca 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -681,6 +681,24 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
       return this.inWireFormat;
    }
 
+   private void rollbackInProgressLocalTransactions() {
+
+      for (Transaction tx : txMap.values()) {
+         AMQSession session = (AMQSession) tx.getProtocolData();
+         if (session != null) {
+            session.getCoreSession().resetTX(tx);
+            try {
+               session.getCoreSession().rollback(false);
+            } catch (Exception expectedOnExistingOutcome) {
+            } finally {
+               session.getCoreSession().resetTX(null);
+            }
+         } else {
+            tx.tryRollback();
+         }
+      }
+   }
+
    private void shutdown(boolean fail) {
 
       try {
@@ -754,9 +772,19 @@ public class OpenWireConnection extends 
AbstractRemotingConnection implements Se
    @Override
    public void fail(ActiveMQException me, String message) {
 
-      for (Transaction tx : txMap.values()) {
-         tx.tryRollback();
+      final ThresholdActor<Command> localVisibleActor = openWireActor;
+      if (localVisibleActor != null) {
+         localVisibleActor.shutdown(() -> doFail(me, message));
+      } else {
+         doFail(me, message);
       }
+   }
+
+   private void doFail(ActiveMQException me, String message) {
+
+      recoverOperationContext();
+
+      rollbackInProgressLocalTransactions();
 
       if (me != null) {
          //filter it like the other protocols
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
index 9d086a1f54..80d5ca05df 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/PrefetchRedeliveryCountOpenwireTest.java
@@ -17,28 +17,44 @@
 package org.apache.activemq.artemis.tests.integration.openwire;
 
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
+import java.io.PrintStream;
+import java.lang.invoke.MethodHandles;
 import java.util.Map;
-
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.RedeliveryPolicy;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.cli.commands.tools.PrintData;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.failover.FailoverTransport;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PrefetchRedeliveryCountOpenwireTest extends OpenWireTestBase {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    @Override
    public void setUp() throws Exception {
       realStore = true;
@@ -176,4 +192,120 @@ public class PrefetchRedeliveryCountOpenwireTest extends 
OpenWireTestBase {
          }
       }
    }
+
+   @Test(timeout = 60_000)
+   public void 
testExclusiveConsumerTransactionalBatchOnReconnectionLargePrefetch() throws 
Exception {
+      Connection exConn = null;
+
+      SimpleString durableQueue = new SimpleString("exampleQueueTwo");
+      this.server.createQueue(new 
QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));
+      AtomicInteger batchConsumed = new AtomicInteger(0);
+
+      try {
+         ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+         exFact.setWatchTopicAdvisories(false);
+
+         RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+         redeliveryPolicy.setMaximumRedeliveries(4000);
+         exFact.setRedeliveryPolicy(redeliveryPolicy);
+
+         Queue queue = new ActiveMQQueue("exampleQueueTwo");
+
+         exConn = exFact.createConnection();
+
+         exConn.start();
+
+         Session session = exConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+         MessageProducer producer = session.createProducer(queue);
+
+         TextMessage message = session.createTextMessage("This is a text 
message");
+
+         ExecutorService executorService = Executors.newSingleThreadExecutor();
+         int numMessages = 600;
+         for (int i = 0; i < numMessages; i++) {
+            message.setIntProperty("SEQ", i);
+            producer.send(message);
+         }
+         session.close();
+         exConn.close();
+
+         final int batch = numMessages;
+         AtomicInteger commits = new AtomicInteger(0);
+         AtomicBoolean done = new AtomicBoolean(false);
+         while (!done.get()) {
+            // connection per batch attempt
+            exConn = exFact.createConnection();
+            ((ActiveMQConnection) exConn).setCloseTimeout(1); // so rollback 
on close won't block after socket close exception
+
+            exConn.start();
+
+            session = exConn.createSession(true, Session.SESSION_TRANSACTED);
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            TextMessage messageReceived = null;
+            for (int j = 0; j < batch; j++) {
+               messageReceived = (TextMessage) messageConsumer.receive(2000);
+               if (messageReceived == null) {
+                  done.set(true);
+                  break;
+               }
+               batchConsumed.incrementAndGet();
+               assertEquals("This is a text message", 
messageReceived.getText());
+            }
+
+            // arrange concurrent commit - ack/commit
+            // with server side error, potential for ack/commit and 
close-on-fail to contend
+            final CountDownLatch latch = new CountDownLatch(1);
+            Session finalSession = session;
+            executorService.submit(new Runnable() {
+               @Override
+               public void run() {
+                  try {
+                     latch.countDown();
+                     finalSession.commit();
+                     commits.incrementAndGet();
+
+                  } catch (JMSException e) {
+                  }
+               }
+            });
+
+            latch.await(1, TimeUnit.SECONDS);
+            // force a local socket close such that the broker sees an 
exception on the connection and fails the consumer via serverConsumer close
+            ((FailoverTransport) ((org.apache.activemq.ActiveMQConnection) 
exConn).getTransport().narrow(FailoverTransport.class)).stop();
+            exConn.close();
+         }
+      } finally {
+         if (exConn != null) {
+            exConn.close();
+         }
+      }
+
+      logger.info("Done after: {}, queue: {}", batchConsumed.get(), 
server.locateQueue(durableQueue));
+      try {
+         Wait.assertEquals(0L, () -> 
server.locateQueue(durableQueue).getDeliveringCount(), 1000);
+      } catch (Throwable e) {
+
+         final AtomicBoolean doOut = new AtomicBoolean(false);
+         PrintStream out = new PrintStream(System.out) {
+
+            @Override
+            public void println(String s) {
+               if (doOut.get()) {
+                  super.println(s);
+               } else {
+                  if (s.startsWith("### Failed Transactions")) {
+                     doOut.set(true);
+                     super.println(s);
+                  }
+               }
+            }
+         };
+         
PrintData.printData(server.getConfiguration().getBindingsLocation(),server.getConfiguration().getJournalLocation(),server.getConfiguration().getPagingLocation(),
 out, true, true, true, false, -1);
+
+         throw e;
+      }
+   }
+
 }

Reply via email to