tabish121 commented on code in PR #4840:
URL: https://github.com/apache/activemq-artemis/pull/4840#discussion_r1511892522
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationBaseSenderController.java:
##########
@@ -89,6 +89,7 @@ public MessageWriter
selectOutgoingMessageWriter(ProtonServerSenderContext sende
if (message.isLargeMessage()) {
selected = largeMessageWriter != null ? largeMessageWriter :
(largeMessageWriter = new AMQPLargeMessageWriter(sender));
+ largeMessageWriter.openContext(reference);
Review Comment:
Ditto, extending the open likely makes more sense if eventually all these
large message handlers are going to need this on having different opens called
in different places is confusing.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
}
@Override
- public Message readBytes(Delivery delivery) throws Exception {
+ public boolean readBytes(Delivery delivery) throws Exception {
if (closed) {
throw new IllegalStateException("AMQP Large Message Reader is closed
and read cannot proceed");
}
+ serverReceiver.connection.requireInHandler();
+
+ logger.trace("Reading {}", delivery);
+
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
+ final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
+
if (currentMessage == null) {
- final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
- currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null,
-
sessionSPI.getCoreMessageObjectPools(),
- sessionSPI.getStorageManager());
+ currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);
-
+ logger.trace("Initializing current message {} on {}", currentMessage,
this);
sessionSPI.getStorageManager().largeMessageCreated(id,
currentMessage);
+ sessionSPI.execute(currentMessage::validateFile);
Review Comment:
This is now happening in a different thread, how is the error propagated to
the remote peer now that it is gone, and the method is swallowing all errors?
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
}
@Override
- public Message readBytes(Delivery delivery) throws Exception {
+ public boolean readBytes(Delivery delivery) throws Exception {
if (closed) {
throw new IllegalStateException("AMQP Large Message Reader is closed
and read cannot proceed");
}
+ serverReceiver.connection.requireInHandler();
+
+ logger.trace("Reading {}", delivery);
+
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
+ final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
+
if (currentMessage == null) {
- final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
- currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null,
-
sessionSPI.getCoreMessageObjectPools(),
- sessionSPI.getStorageManager());
+ currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);
-
+ logger.trace("Initializing current message {} on {}", currentMessage,
this);
sessionSPI.getStorageManager().largeMessageCreated(id,
currentMessage);
+ sessionSPI.execute(currentMessage::validateFile);
}
- currentMessage.addBytes(dataBuffer);
+ logger.trace("Disable autoread on {}", this);
Review Comment:
There is no toString in this type, logging this just logs the object
reference value, not super helpful for logging. Also if you need to log this
doesn't it make more sense to have the disable and enable methods log it, along
with connection id data.
##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java:
##########
@@ -238,18 +247,18 @@ public void
testReadMessageInByteChunksFromDeliveryBuffer() throws Exception {
when(receiver.recv()).thenReturn(deliveryAnnotations.duplicate());
try {
- assertNull(reader.readBytes(delivery));
+ readMessage = null;
+ reader.readBytes(delivery);
+ Assert.assertNull(readMessage);
} catch (IllegalStateException e) {
fail("Should not throw as the reader should be able to read just
delivery annotations.");
}
- assertNotNull(reader.getDeliveryAnnotations());
+ DeliveryAnnotations currentDeliveryAnnotations =
reader.getDeliveryAnnotations();
Review Comment:
Change appears unnecessary
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
}
@Override
- public Message readBytes(Delivery delivery) throws Exception {
+ public boolean readBytes(Delivery delivery) throws Exception {
if (closed) {
throw new IllegalStateException("AMQP Large Message Reader is closed
and read cannot proceed");
}
+ serverReceiver.connection.requireInHandler();
+
+ logger.trace("Reading {}", delivery);
+
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
+ final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
+
if (currentMessage == null) {
- final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
- currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null,
-
sessionSPI.getCoreMessageObjectPools(),
- sessionSPI.getStorageManager());
+ currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);
-
+ logger.trace("Initializing current message {} on {}", currentMessage,
this);
sessionSPI.getStorageManager().largeMessageCreated(id,
currentMessage);
+ sessionSPI.execute(currentMessage::validateFile);
}
- currentMessage.addBytes(dataBuffer);
+ logger.trace("Disable autoread on {}", this);
+ serverReceiver.getConnection().disableAutoRead();
- final AMQPLargeMessage result;
+ byte[] bytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(bytes);
+ logger.trace("Getting {}", bytes.length);
- if (!delivery.isPartial()) {
-
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(),
true);
- result = currentMessage;
- // We don't want a close to delete the file now, we've released the
resources.
- currentMessage = null;
- deliveryAnnotations = result.getDeliveryAnnotations();
- } else {
- result = null;
- }
+ boolean partial = delivery.isPartial();
+
+ sessionSPI.execute(() -> addBytes(delivery, bytes, partial));
+
+ return partial;
+ }
- return result;
+ private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) {
+ ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes);
+ try {
+ logger.trace("Adding {} bytes on currentMessage={}, this={}",
dataBuffer.remaining(), currentMessage, this);
+ currentMessage.addBytes(dataBuffer);
+
+ if (!isPartial) {
+ final AMQPLargeMessage message = currentMessage;
+
message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(),
true);
+ logger.trace("finishing {} on {}", currentMessage, this);
+ // We don't want a close to delete the file now, we've released
the resources.
+ currentMessage = null;
+ close();
+ serverReceiver.connection.runNow(() -> {
+ DeliveryAnnotations deliveryAnnotations =
message.getDeliveryAnnotations();
+ serverReceiver.onMessageComplete(delivery, message,
deliveryAnnotations);
+ });
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ close();
+ serverReceiver.connection.exception(e);
+ } finally {
+ serverReceiver.getConnection().enableAutoRead();
+ }
}
+
Review Comment:
Unneeded added newline
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreMessageReader.java:
##########
@@ -155,6 +150,10 @@ public Message readBytes(Delivery delivery) {
coreMessage.reloadPersistence(buffer,
sessionSPI.getCoreMessageObjectPools());
coreMessage.setMessageID(sessionSPI.getStorageManager().generateID());
- return coreMessage;
+ serverReceiver.onMessageComplete(delivery, coreMessage,
deliveryAnnotations);
+
+ close();
+
+ return delivery.isPartial();
Review Comment:
You already check isPartial above, can just return false.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
}
@Override
- public Message readBytes(Delivery delivery) throws Exception {
+ public boolean readBytes(Delivery delivery) throws Exception {
if (closed) {
throw new IllegalStateException("AMQP Large Message Reader is closed
and read cannot proceed");
}
+ serverReceiver.connection.requireInHandler();
+
+ logger.trace("Reading {}", delivery);
+
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
+ final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
+
if (currentMessage == null) {
- final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
- currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null,
-
sessionSPI.getCoreMessageObjectPools(),
- sessionSPI.getStorageManager());
+ currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);
-
+ logger.trace("Initializing current message {} on {}", currentMessage,
this);
sessionSPI.getStorageManager().largeMessageCreated(id,
currentMessage);
+ sessionSPI.execute(currentMessage::validateFile);
}
- currentMessage.addBytes(dataBuffer);
+ logger.trace("Disable autoread on {}", this);
+ serverReceiver.getConnection().disableAutoRead();
- final AMQPLargeMessage result;
+ byte[] bytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(bytes);
+ logger.trace("Getting {}", bytes.length);
Review Comment:
In debugging such value log messages don't help to orient you on what is
going on
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
}
@Override
- public Message readBytes(Delivery delivery) throws Exception {
+ public boolean readBytes(Delivery delivery) throws Exception {
if (closed) {
throw new IllegalStateException("AMQP Large Message Reader is closed
and read cannot proceed");
}
+ serverReceiver.connection.requireInHandler();
+
+ logger.trace("Reading {}", delivery);
+
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
+ final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
+
if (currentMessage == null) {
- final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
- currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null,
-
sessionSPI.getCoreMessageObjectPools(),
- sessionSPI.getStorageManager());
+ currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
Review Comment:
Unneeded reformatting.
##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java:
##########
@@ -159,19 +169,18 @@ public void testReadMessageByteByByteFromDeliveryBuffer()
throws Exception {
when(receiver.recv()).thenReturn(deliveryAnnotations.duplicate().position(i -
1).limit(i).slice());
try {
- assertNull(reader.readBytes(delivery));
+ readMessage = null;
+ reader.readBytes(delivery);
+ Assert.assertNull(readMessage);
} catch (IllegalStateException e) {
fail("Should not throw as the reader should be able to read just
delivery annotations.");
}
}
- assertNotNull(reader.getDeliveryAnnotations());
-
- final DeliveryAnnotations annotations = reader.getDeliveryAnnotations();
-
- assertTrue(annotations.getValue().get(Symbol.valueOf("a")).equals("a"));
- assertTrue(annotations.getValue().get(Symbol.valueOf("b")).equals("b"));
- assertTrue(annotations.getValue().get(Symbol.valueOf("c")).equals("c"));
+ DeliveryAnnotations currentAnnotations = reader.getDeliveryAnnotations();
Review Comment:
Change appears unnecessary
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java:
##########
@@ -847,6 +847,7 @@ public MessageWriter
selectOutgoingMessageWriter(ProtonServerSenderContext sende
if (message.isLargeMessage()) {
selected = largeMessageWriter != null ? largeMessageWriter :
(largeMessageWriter = new AMQPLargeMessageWriter(sender));
+ largeMessageWriter.openContext(reference);
Review Comment:
It likely makes more sense to extend the open API in the MessageWriter to
take the message reference instead of adding yet another open method that is
confusing, and not documented as other methods in the API where.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.java:
##########
@@ -453,6 +453,7 @@ public MessageWriter
selectOutgoingMessageWriter(ProtonServerSenderContext sende
if (reference.getMessage() instanceof AMQPLargeMessage) {
selected = largeMessageWriter;
+ largeMessageWriter.openContext(reference);
Review Comment:
Replace with updated open API
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java:
##########
@@ -94,6 +94,18 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public void disableAutoRead() {
+ connectionCallback.getTransportConnection().setAutoRead(false);
+ handler.setReadable(false);
+ }
+
+ public void enableAutoRead() {
+ connectionCallback.getTransportConnection().setAutoRead(true);
+ getHandler().setReadable(true);
+ flush();
+ }
+
+
Review Comment:
Unneeded extra newline.
##########
artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPTunneledCoreLargeMessageReaderTest.java:
##########
@@ -116,21 +128,19 @@ public void
testReadDeliveryAnnotationsFromDeliveryBuffer() throws Exception {
reader.open();
- assertNull(reader.getDeliveryAnnotations());
-
try {
+ readMessage = null;
reader.readBytes(delivery);
+ Assert.assertNull(readMessage); // should not been called yet
} catch (IllegalStateException e) {
fail("Should not throw as the reader should be able to read just
delivery annotations.");
}
- assertNotNull(reader.getDeliveryAnnotations());
-
- final DeliveryAnnotations annotations = reader.getDeliveryAnnotations();
-
- assertTrue(annotations.getValue().get(Symbol.valueOf("a")).equals("a"));
- assertTrue(annotations.getValue().get(Symbol.valueOf("b")).equals("b"));
- assertTrue(annotations.getValue().get(Symbol.valueOf("c")).equals("c"));
+ // peek current delivery annotations
+ DeliveryAnnotations currentAnnotations = reader.getDeliveryAnnotations();
Review Comment:
Change appears unnecessary
##########
tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java:
##########
@@ -640,6 +640,11 @@ public String getText() throws NoSuchElementException {
throw new NoSuchElementException("Message does not contain a String
body");
}
+ public byte[] getBytes() {
+ Data body = (Data)getWrappedMessage().getBody();
+ return body.getValue().getArray();
Review Comment:
The byte array contained within is not guaranteed to be zero indexed or span
the full array, you need to check and account for that if you are going to
expose it.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
}
@Override
- public Message readBytes(Delivery delivery) throws Exception {
+ public boolean readBytes(Delivery delivery) throws Exception {
if (closed) {
throw new IllegalStateException("AMQP Large Message Reader is closed
and read cannot proceed");
}
+ serverReceiver.connection.requireInHandler();
+
+ logger.trace("Reading {}", delivery);
+
final Receiver receiver = ((Receiver) delivery.getLink());
final ReadableBuffer dataBuffer = receiver.recv();
+ final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
+
if (currentMessage == null) {
- final AMQPSessionCallback sessionSPI =
serverReceiver.getSessionContext().getSessionSPI();
final long id = sessionSPI.getStorageManager().generateID();
- currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null,
-
sessionSPI.getCoreMessageObjectPools(),
- sessionSPI.getStorageManager());
+ currentMessage = new AMQPLargeMessage(id,
delivery.getMessageFormat(), null, sessionSPI.getCoreMessageObjectPools(),
sessionSPI.getStorageManager());
currentMessage.parseHeader(dataBuffer);
-
+ logger.trace("Initializing current message {} on {}", currentMessage,
this);
sessionSPI.getStorageManager().largeMessageCreated(id,
currentMessage);
+ sessionSPI.execute(currentMessage::validateFile);
}
- currentMessage.addBytes(dataBuffer);
+ logger.trace("Disable autoread on {}", this);
+ serverReceiver.getConnection().disableAutoRead();
- final AMQPLargeMessage result;
+ byte[] bytes = new byte[dataBuffer.remaining()];
+ dataBuffer.get(bytes);
+ logger.trace("Getting {}", bytes.length);
- if (!delivery.isPartial()) {
-
currentMessage.releaseResources(serverReceiver.getConnection().isLargeMessageSync(),
true);
- result = currentMessage;
- // We don't want a close to delete the file now, we've released the
resources.
- currentMessage = null;
- deliveryAnnotations = result.getDeliveryAnnotations();
- } else {
- result = null;
- }
+ boolean partial = delivery.isPartial();
+
+ sessionSPI.execute(() -> addBytes(delivery, bytes, partial));
+
+ return partial;
+ }
- return result;
+ private void addBytes(Delivery delivery, byte[] bytes, boolean isPartial) {
+ ReadableBuffer dataBuffer = ReadableBuffer.ByteBufferReader.wrap(bytes);
+ try {
+ logger.trace("Adding {} bytes on currentMessage={}, this={}",
dataBuffer.remaining(), currentMessage, this);
+ currentMessage.addBytes(dataBuffer);
+
+ if (!isPartial) {
+ final AMQPLargeMessage message = currentMessage;
+
message.releaseResources(serverReceiver.getConnection().isLargeMessageSync(),
true);
+ logger.trace("finishing {} on {}", currentMessage, this);
+ // We don't want a close to delete the file now, we've released
the resources.
+ currentMessage = null;
+ close();
+ serverReceiver.connection.runNow(() -> {
+ DeliveryAnnotations deliveryAnnotations =
message.getDeliveryAnnotations();
Review Comment:
Can skip temp var creation and just call message.getDeliveryAnnotations
directly.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java:
##########
@@ -81,11 +84,20 @@ public boolean isWriting() {
public void close() {
if (!closed) {
try {
+ try {
+ if (largeBodyReader != null) {
+ largeBodyReader.close();
+ }
+ largeBodyReader = null;
Review Comment:
Should be nulled in a finally block, which would happen in the resetClosed
method called in the main finally block if you added a null there for the large
body reader as you should.
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java:
##########
@@ -59,6 +59,9 @@ public class AMQPLargeMessageWriter implements MessageWriter {
private MessageReference reference;
private AMQPLargeMessage message;
+
+ LargeBodyReader largeBodyReader;
Review Comment:
Can be made private from looking though the code, if not an accessor that
checks opened state is likely a better idea
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageWriter.java:
##########
@@ -96,18 +108,36 @@ public AMQPLargeMessageWriter open() {
throw new IllegalStateException("Trying to open an AMQP Large Message
writer that was not closed");
}
- reset(false);
+ resetOpen();
return this;
}
- private void reset(boolean closedState) {
+ public void openContext(MessageReference reference) {
+ this.reference = reference;
+ this.message = (AMQPLargeMessage) reference.getMessage();
+
+ try {
+ largeBodyReader = message.getLargeBodyReader();
+ largeBodyReader.open();
+ } catch (Exception e) {
+ serverSender.reportDeliveryError(this, reference, e);
+ }
+ }
+
+ private void resetClosed() {
Review Comment:
added member large body reader is no null as it should be here
##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPLargeMessageReader.java:
##########
@@ -77,39 +74,68 @@ public AMQPLargeMessageReader open() {
}
@Override
- public Message readBytes(Delivery delivery) throws Exception {
+ public boolean readBytes(Delivery delivery) throws Exception {
if (closed) {
throw new IllegalStateException("AMQP Large Message Reader is closed
and read cannot proceed");
}
+ serverReceiver.connection.requireInHandler();
+
+ logger.trace("Reading {}", delivery);
Review Comment:
This trace isn't likely to add much useful context just more log noise
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]