[ https://issues.apache.org/jira/browse/ARTEMIS-4668?focusedWorklogId=908385&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-908385 ]
ASF GitHub Bot logged work on ARTEMIS-4668: ------------------------------------------- Author: ASF GitHub Bot Created on: 05/Mar/24 18:12 Start Date: 05/Mar/24 18:12 Worklog Time Spent: 10m Work Description: clebertsuconic commented on code in PR #4840: URL: https://github.com/apache/activemq-artemis/pull/4840#discussion_r1513284233 ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(() -> validateFile(currentMessage)); } - currentMessage.addBytes(dataBuffer); + serverReceiver.getConnection().disableAutoRead(); - final AMQPLargeMessage result; + byte[] bytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(bytes); - if (!delivery.isPartial()) { - currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); - result = currentMessage; - // We don't want a close to delete the file now, we've released the resources. - currentMessage = null; - deliveryAnnotations = result.getDeliveryAnnotations(); - } else { - result = null; + boolean partial = delivery.isPartial(); + + sessionSPI.execute(() -> addBytes(delivery, bytes, partial)); + + return partial; + } + + private void validateFile(AMQPLargeMessage message) { + try { + message.validateFile(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + close(); + serverReceiver.connection.exception(e); } + } - return result; + private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) { + ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes); + try { + logger.trace("Adding {} bytes on currentMessage={}, this={}", dataBuffer.remaining(), currentMessage, this); + currentMessage.addBytes(dataBuffer); + + if (!isPartial) { + final AMQPLargeMessage message = currentMessage; + message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); + logger.trace("finishing {} on {}", currentMessage, this); + // We don't want a close to delete the file now, we've released the resources. + currentMessage = null; + close(); Review Comment: I added a commit to be squashed addressing this ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(() -> validateFile(currentMessage)); Review Comment: I added a commit to be squashed addressing this Issue Time Tracking ------------------- Worklog Id: (was: 908385) Time Spent: 4h 20m (was: 4h 10m) > Move AMQP Large Message File Handling away from Netty thread > ------------------------------------------------------------ > > Key: ARTEMIS-4668 > URL: https://issues.apache.org/jira/browse/ARTEMIS-4668 > Project: ActiveMQ Artemis > Issue Type: Bug > Affects Versions: 2.32.0 > Reporter: Clebert Suconic > Assignee: Clebert Suconic > Priority: Major > Fix For: 2.33.0 > > Time Spent: 4h 20m > Remaining Estimate: 0h > > Operations like file.open, file.close, and file.sync should be moved away > from the Netty Thread for AMQP Large Messages > This task now is about moving the processing for AMQP Messages. we may in a > near future also improve tunneled large messages. For now we will do for AMQP > messages only. -- This message was sent by Atlassian Jira (v8.20.10#820010)