tabish121 commented on code in PR #4840: URL: https://github.com/apache/activemq-artemis/pull/4840#discussion_r1513034841
########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java: ########## @@ -77,39 +73,73 @@ public AMQPLargeMessageReader open() { } @Override - public Message readBytes(Delivery delivery) throws Exception { + public boolean readBytes(Delivery delivery) throws Exception { if (closed) { throw new IllegalStateException("AMQP Large Message Reader is closed and read cannot proceed"); } + serverReceiver.connection.requireInHandler(); + final Receiver receiver = ((Receiver) delivery.getLink()); final ReadableBuffer dataBuffer = receiver.recv(); + final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); + if (currentMessage == null) { - final AMQPSessionCallback sessionSPI = serverReceiver.getSessionContext().getSessionSPI(); final long id = sessionSPI.getStorageManager().generateID(); currentMessage = new AMQPLargeMessage(id, delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(), sessionSPI.getStorageManager()); currentMessage.parseHeader(dataBuffer); - + logger.trace("Initializing current message {} on {}", currentMessage, this); sessionSPI.getStorageManager().largeMessageCreated(id, currentMessage); + sessionSPI.execute(() -> validateFile(currentMessage)); } - currentMessage.addBytes(dataBuffer); + serverReceiver.getConnection().disableAutoRead(); - final AMQPLargeMessage result; + byte[] bytes = new byte[dataBuffer.remaining()]; + dataBuffer.get(bytes); - if (!delivery.isPartial()) { - currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); - result = currentMessage; - // We don't want a close to delete the file now, we've released the resources. - currentMessage = null; - deliveryAnnotations = result.getDeliveryAnnotations(); - } else { - result = null; + boolean partial = delivery.isPartial(); + + sessionSPI.execute(() -> addBytes(delivery, bytes, partial)); + + return partial; + } + + private void validateFile(AMQPLargeMessage message) { + try { + message.validateFile(); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + close(); + serverReceiver.connection.exception(e); } + } - return result; + private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) { + ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes); + try { + logger.trace("Adding {} bytes on currentMessage={}, this={}", dataBuffer.remaining(), currentMessage, this); + currentMessage.addBytes(dataBuffer); + + if (!isPartial) { + final AMQPLargeMessage message = currentMessage; + message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(), true); + logger.trace("finishing {} on {}", currentMessage, this); + // We don't want a close to delete the file now, we've released the resources. + currentMessage = null; + close(); Review Comment: This seems to be incorrect as closing here is wrong as a general rule as the resource that opened this should be closing it but also this is occurring in a different thread than the connection thread where the reader was opened which could lead to error on inconsistent state. The receiver should be closing this in the connection thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@activemq.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org