This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new d4fff2fd58 ARTEMIS-5155 Race on AMQP large message read and close
d4fff2fd58 is described below

commit d4fff2fd580d4cdc2de276dcc9bad83120aa5f25
Author: Timothy Bish <[email protected]>
AuthorDate: Mon Nov 18 17:45:59 2024 -0500

    ARTEMIS-5155 Race on AMQP large message read and close
    
    When the final frame of a large message is being written to the file in
    the session thread and an IO error occurs such that that connection is
    torn down, the large message reader can be closed before the message is
    fully processed resulting in corruption. The large message file close
    logic needs to occur on the session thread so that the processing of the
    bytes can finish and the message gets added to the Queue and the close
    can react by not deleting the file when it runs following the read task.
---
 .../amqp/proton/AMQPLargeMessageReader.java        | 41 ++++++++++++++++++----
 1 file changed, 34 insertions(+), 7 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
index 63af7b1418..04d786d7e7 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
+import java.lang.invoke.MethodHandles;
+
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPLargeMessage;
@@ -25,6 +27,8 @@ import 
org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.codec.ReadableBuffer;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Reader of {@link AMQPLargeMessage} content which reads all bytes and 
completes once a
@@ -32,6 +36,8 @@ import org.apache.qpid.proton.engine.Receiver;
  */
 public class AMQPLargeMessageReader implements MessageReader {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    private final ProtonAbstractReceiver serverReceiver;
 
    private volatile AMQPLargeMessage currentMessage;
@@ -51,14 +57,27 @@ public class AMQPLargeMessageReader implements 
MessageReader {
    public void close() {
       if (!closed) {
          try {
-            AMQPLargeMessage localCurrentMessage = currentMessage;
-            if (localCurrentMessage != null) {
-               localCurrentMessage.deleteFile();
+            final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
+
+            if (currentMessage != null) {
+               sessionSPI.execute(() -> {
+                  // Run the file delete on the session thread, this allows 
processing of the
+                  // last addBytes to complete which might allow the message 
to be fully read
+                  // in which case currentMessage will be nulled and we won't 
delete it as it
+                  // will have already been handed to the connection thread 
for enqueue.
+                  if (currentMessage != null) {
+                     try {
+                        currentMessage.deleteFile();
+                     } catch (Throwable error) {
+                        
ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
+                     } finally {
+                        currentMessage = null;
+                     }
+                  }
+               });
             }
-         } catch (Throwable error) {
-            ActiveMQServerLogger.LOGGER.errorDeletingLargeMessageFile(error);
-         } finally {
-            currentMessage = null;
+         } catch (Exception ex) {
+            logger.trace("AMQP Large Message reader close ignored error: ", 
ex);
          }
 
          deliveryAnnotations = null;
@@ -117,6 +136,14 @@ public class AMQPLargeMessageReader implements 
MessageReader {
    private void addBytes(Delivery delivery, ReadableBuffer dataBuffer, boolean 
isPartial) {
       final AMQPLargeMessage localCurrentMessage = currentMessage;
 
+      // Add bytes runs on the session thread and if the close is called and 
the scheduled file
+      // delete occurs on the session thread first then current message will 
be null and we return.
+      // But if the closed delete hasn't run first we can safely continue 
processing this message
+      // in hopes we already read all the bytes before the connection was 
dropped.
+      if (localCurrentMessage == null) {
+         return;
+      }
+
       try {
          localCurrentMessage.addBytes(dataBuffer);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to