tabish121 commented on code in PR #4840:
URL: https://github.com/apache/activemq-artemis/pull/4840#discussion_r1513036542


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() {
    }
 
    @Override
-   public Message readBytes(Delivery delivery) throws Exception {
+   public boolean readBytes(Delivery delivery) throws Exception {
       if (closed) {
          throw new IllegalStateException("AMQP Large Message Reader is closed 
and read cannot proceed");
       }
 
+      serverReceiver.connection.requireInHandler();
+
       final Receiver receiver = ((Receiver) delivery.getLink());
       final ReadableBuffer dataBuffer = receiver.recv();
 
+      final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
+
       if (currentMessage == null) {
-         final AMQPSessionCallback sessionSPI = 
serverReceiver.getSessionContext().getSessionSPI();
          final long id = sessionSPI.getStorageManager().generateID();
          currentMessage = new AMQPLargeMessage(id, 
delivery.getMessageFormat(), null,
                                                
sessionSPI.getCoreMessageObjectPools(),
                                                sessionSPI.getStorageManager());
          currentMessage.parseHeader(dataBuffer);
-
+         logger.trace("Initializing current message {} on {}", currentMessage, 
this);
          sessionSPI.getStorageManager().largeMessageCreated(id, 
currentMessage);
+         sessionSPI.execute(() -> validateFile(currentMessage));

Review Comment:
   This check seems to be mostly pointless now as the code doesn't use a 
continuation to resume delivery after validation it just throws it into another 
thread and then continues on as if it was successful  



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageReader.java:
##########
@@ -155,6 +149,10 @@ public Message readBytes(Delivery delivery) {
       coreMessage.reloadPersistence(buffer, 
sessionSPI.getCoreMessageObjectPools());
       coreMessage.setMessageID(sessionSPI.getStorageManager().generateID());
 
-      return coreMessage;
+      serverReceiver.onMessageComplete(delivery, coreMessage, 
deliveryAnnotations);
+
+      close();

Review Comment:
   The receiver should be closing this in onMessageComplete, a reader shouldn't 
close itself. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to