This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 75a0a9ab98 ARTEMIS-5890 AMQP LargeMessage writer should check sender 
state before attempting a write
75a0a9ab98 is described below

commit 75a0a9ab981be060c51a2a2091eb30807930ae7e
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Feb 4 11:32:55 2026 -0500

    ARTEMIS-5890 AMQP LargeMessage writer should check sender state before 
attempting a write
    
    When attempting to trigger a write either from the delivery method or from 
a call to
    the runnable hook in MessageReference the writer should check its parent 
sender to
    ensure that it hasn't been remotely closed to avoid trigger exceptions due 
to the
    sender having been closed or its parent connection.
---
 .../amqp/proton/AMQPLargeMessageWriter.java        |  14 +++
 .../amqp/proton/ProtonServerSenderContext.java     |   4 +
 .../integration/amqp/AmqpLargeMessageTest.java     | 139 +++++++++++++++++++++
 3 files changed, 157 insertions(+)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java
index 833cee5f2b..2fdfea90b3 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java
@@ -138,6 +138,15 @@ public class AMQPLargeMessageWriter implements 
MessageWriter {
       closed = false;
    }
 
+   @Override
+   public void accept(MessageReference messageReference) {
+      if (serverSender.isClosed()) {
+         logger.trace("Server sender was closed before queued write attempt 
was executed");
+      } else {
+         writeBytes(messageReference);
+      }
+   }
+
    @Override
    public void writeBytes(MessageReference messageReference) {
       if (protonSender.getLocalState() == EndpointState.CLOSED) {
@@ -174,6 +183,11 @@ public class AMQPLargeMessageWriter implements 
MessageWriter {
          return;
       }
 
+      if (serverSender.isClosed()) {
+         logger.trace("Server sender was closed before queued write attempt 
was executed");
+         return;
+      }
+
       // This is discounting some bytes due to Transfer payload
       final int frameSize = 
protonSender.getSession().getConnection().getTransport().getOutboundFrameSizeLimit()
 - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 428cf8aee6..fb837a3b22 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -136,6 +136,10 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
       return sender;
    }
 
+   public boolean isClosed() {
+      return closed;
+   }
+
    @Override
    public void onFlow(int currentCredits, boolean drain) {
       if (logger.isDebugEnabled()) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
index e033dc9c2c..6114b8e7cf 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpLargeMessageTest.java
@@ -37,6 +37,10 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.BytesMessage;
@@ -51,6 +55,7 @@ import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -83,12 +88,14 @@ import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.message.impl.MessageImpl;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestTemplate;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.concurrent.atomic.AtomicInteger;
 
 @ExtendWith(ParameterizedTestExtension.class)
 public class AmqpLargeMessageTest extends AmqpClientTestSupport {
@@ -1176,6 +1183,138 @@ public class AmqpLargeMessageTest extends 
AmqpClientTestSupport {
       runAfter(server::stop);
    }
 
+   @Test
+   public void testInterruptStreaming() throws Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:61616");
+
+      final int MESSAGE_COUNT = 20;
+      final int MESSAGE_SIZE = 200 * 1024;
+      final int THREADS = 10;
+      byte[] payload = createLargePayload(MESSAGE_SIZE);
+
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler(true)) {
+
+         final CyclicBarrier consumersReady = new CyclicBarrier(THREADS + 1);
+         final CountDownLatch consumersDone = new CountDownLatch(THREADS);
+         final CountDownLatch producersDone = new CountDownLatch(THREADS);
+         final AtomicInteger countError = new AtomicInteger(0);
+
+         ExecutorService executorService = 
Executors.newFixedThreadPool(THREADS * 2);
+         runAfter(executorService::shutdownNow);
+
+         for (int i = 0; i < THREADS; i++) {
+            final int threadId = i;
+            executorService.execute(() -> {
+               try {
+                  consumeForInterruptedStreaming(factory, threadId, 
consumersReady, MESSAGE_COUNT, payload);
+               } catch (Throwable error) {
+                  countError.incrementAndGet();
+                  logger.warn(error.getMessage(), error);
+               } finally {
+                  consumersDone.countDown();
+               }
+            });
+         }
+
+         consumersReady.await(30, TimeUnit.SECONDS);
+
+         for (int i = 0; i < THREADS; i++) {
+            executorService.execute(() -> {
+               try {
+                  produceForInterruptedStreaming(factory, MESSAGE_COUNT, 
payload);
+               } catch (Throwable e) {
+                  countError.incrementAndGet();
+                  logger.warn(e.getMessage(), e);
+               } finally {
+                  producersDone.countDown();
+               }
+            });
+         }
+
+         assertTrue(consumersDone.await(5, TimeUnit.MINUTES));
+         assertTrue(producersDone.await(5, TimeUnit.MINUTES));
+
+         assertEquals(0, countError.get(), "There are exceptions on the 
producers or consumers");
+
+         assertFalse(loggerHandler.findTrace("IllegalArgumentException"), 
"Server log contains IllegalArgumentException");
+      }
+   }
+
+   private static void produceForInterruptedStreaming(ConnectionFactory 
factory, int messageCount, byte[] payload) throws Throwable {
+      Connection connection = null;
+      try {
+         connection = factory.createConnection();
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic("testTopic");
+         MessageProducer producer = session.createProducer(topic);
+
+         for (int j = 0; j < messageCount; j++) {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(payload);
+            producer.send(message);
+
+            if ((j + 1) % 10 == 0) {
+               session.commit();
+            }
+         }
+         session.commit();
+      } finally {
+         if (connection != null) {
+            try {
+               connection.close();
+            } catch (Exception e) {
+               logger.error("Error closing producer connection", e);
+            }
+         }
+      }
+   }
+
+   private static void consumeForInterruptedStreaming(ConnectionFactory 
factory,
+                                                      int threadId,
+                                                      CyclicBarrier startFlag, 
int messageCount, byte[] expectedPayload) throws Exception {
+      Connection connection = null;
+      try {
+         connection = factory.createConnection();
+         connection.setClientID("consumer-" + threadId);
+
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic("testTopic");
+
+         // Create durable subscriber
+         MessageConsumer consumer = session.createDurableSubscriber(topic, 
"sub-" + threadId);
+
+         startFlag.await(10, TimeUnit.SECONDS);
+         connection.start();
+
+         for (int i = 0; i < messageCount; i++) {
+            BytesMessage msg = (BytesMessage) 
consumer.receive(TimeUnit.SECONDS.toMillis(30));
+            assertNotNull(msg);
+
+            int bodySize = (int)msg.getBodyLength();
+
+            assertEquals(bodySize, expectedPayload.length);
+
+            byte[] receivedPayLoad =  new byte[bodySize];
+            msg.readBytes(receivedPayLoad);
+
+            assertArrayEquals(expectedPayload, receivedPayLoad);
+
+            if (i % 10 == 0) {
+               session.commit();
+            }
+         }
+         session.commit();
+
+      } finally {
+         if (connection != null) {
+            try {
+               connection.close();
+            } catch (Exception e) {
+               logger.error("Error closing consumer connection", e);
+            }
+         }
+      }
+   }
 
    private void sendObjectMessages(int nMsgs, ConnectionFactory factory) 
throws Exception {
       try (Connection connection = factory.createConnection()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to