This is an automated email from the ASF dual-hosted git repository. gtully pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new b0545d9c45 ARTEMIS-4529 NPE with empty core message and STOMP consumer b0545d9c45 is described below commit b0545d9c457668b650dae3f40052944efd115c32 Author: Justin Bertram <jbert...@apache.org> AuthorDate: Wed Dec 6 15:26:07 2023 -0600 ARTEMIS-4529 NPE with empty core message and STOMP consumer --- .../core/protocol/stomp/StompConnection.java | 7 +++--- .../artemis/core/protocol/stomp/StompSession.java | 19 +++++---------- .../protocol/stomp/VersionedStompFrameHandler.java | 27 +++++++++------------- .../protocol/stomp/v12/StompFrameHandlerV12.java | 6 ++--- .../artemis/tests/integration/stomp/StompTest.java | 16 +++++++++++++ 5 files changed, 38 insertions(+), 37 deletions(-) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index f1b54520ce..abb40f5082 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -622,11 +622,10 @@ public final class StompConnection extends AbstractRemotingConnection { return SERVER_NAME; } - public StompFrame createStompMessage(ICoreMessage serverMessage, - ActiveMQBuffer bodyBuffer, + public StompFrame createStompMessage(ICoreMessage message, StompSubscription subscription, - int deliveryCount) throws Exception { - return frameHandler.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount); + int deliveryCount) { + return frameHandler.createMessageFrame(message, subscription, deliveryCount); } public void addStompEventListener(FrameEventListener listener) { diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 1e370166d7..b39e51c094 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -153,29 +153,22 @@ public class StompSession implements SessionCallback { } @Override - public int sendMessage(MessageReference ref, - Message serverMessage, - final ServerConsumer consumer, - int deliveryCount) { - - ICoreMessage coreMessage = serverMessage.toCore(); - - ICoreMessage newServerMessage = serverMessage.toCore(); + public int sendMessage(MessageReference ref, Message serverMessage, final ServerConsumer consumer, int deliveryCount) { + ICoreMessage message = ref.getMessage().toCore(); try { StompSubscription subscription = subscriptions.get(consumer.getID()); // subscription might be null if the consumer was closed if (subscription == null) return 0; StompFrame frame; - ActiveMQBuffer buffer = coreMessage.getDataBuffer(); - frame = connection.createStompMessage(newServerMessage, buffer, subscription, deliveryCount); + frame = connection.createStompMessage(message, subscription, deliveryCount); int length = frame.getEncodedSize(); if (subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) { if (manager.send(connection, frame)) { - final long messageID = newServerMessage.getMessageID(); + final long messageID = message.getMessageID(); final long consumerID = consumer.getID(); // this will be called after the delivery is complete @@ -191,14 +184,14 @@ public class StompSession implements SessionCallback { }); } } else { - messagesToAck.put(newServerMessage.getMessageID(), new Pair<>(consumer.getID(), length)); + messagesToAck.put(message.getMessageID(), new Pair<>(consumer.getID(), length)); // Must send AFTER adding to messagesToAck - or could get acked from client BEFORE it's been added! manager.send(connection, frame); } return length; } catch (Exception e) { - ActiveMQStompProtocolLogger.LOGGER.unableToSendMessageToClient(coreMessage, e); + ActiveMQStompProtocolLogger.LOGGER.unableToSendMessageToClient(message, e); return 0; } } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 2c64eb2fb6..e48fdcd2b6 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -320,31 +320,26 @@ public abstract class VersionedStompFrameHandler { return response; } - public StompFrame createMessageFrame(ICoreMessage serverMessage, - ActiveMQBuffer bodyBuffer, - StompSubscription subscription, - int deliveryCount) throws Exception { + public StompFrame createMessageFrame(ICoreMessage serverMessage, StompSubscription subscription, int deliveryCount) { StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE); if (subscription.getID() != null) { frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscription.getID()); } - ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer : serverMessage.getReadOnlyBodyBuffer(); + ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer(); - int size = buffer.writerIndex(); + byte[] data = new byte[buffer.writerIndex()]; - byte[] data = new byte[size]; - - if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) { - frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length)); - buffer.readBytes(data); - } else { - SimpleString text = buffer.readNullableSimpleString(); - if (text != null) { - data = text.toString().getBytes(StandardCharsets.UTF_8); + if (data.length > 0) { + if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) || serverMessage.getType() == Message.BYTES_TYPE) { + frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length)); + buffer.readBytes(data); } else { - data = new byte[0]; + SimpleString text = buffer.readNullableSimpleString(); + if (text != null) { + data = text.toString().getBytes(StandardCharsets.UTF_8); + } } } frame.setByteBody(data); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java index 77a92259f9..080628775c 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12; import java.util.concurrent.ScheduledExecutorService; -import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; @@ -50,10 +49,9 @@ public class StompFrameHandlerV12 extends StompFrameHandlerV11 { @Override public StompFrame createMessageFrame(ICoreMessage serverMessage, - ActiveMQBuffer bodyBuffer, StompSubscription subscription, - int deliveryCount) throws Exception { - StompFrame frame = super.createMessageFrame(serverMessage, bodyBuffer, subscription, deliveryCount); + int deliveryCount) { + StompFrame frame = super.createMessageFrame(serverMessage, subscription, deliveryCount); if (!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) { frame.addHeader(Stomp.Headers.Message.ACK, String.valueOf(serverMessage.getMessageID())); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 77bf5e9006..0ee8e63a35 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -344,6 +344,22 @@ public class StompTest extends StompTestBase { } + @Test + public void sendEmptyCoreMessage() throws Exception { + conn.connect(defUser, defPass); + subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO); + + // send core JMS message + MessageProducer mp = session.createProducer(session.createQueue(getQueuePrefix() + getQueueName())); + Message m = session.createMessage(); + mp.send(m); + + // Receive STOMP Message + ClientStompFrame frame = conn.receiveFrame(); + assertNotNull(frame); + assertNull(frame.getBody()); + } + public void sendMessageToNonExistentQueue(String queuePrefix, String queue, RoutingType routingType) throws Exception {