This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new b0545d9c45 ARTEMIS-4529 NPE with empty core message and STOMP consumer
b0545d9c45 is described below
commit b0545d9c457668b650dae3f40052944efd115c32
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Dec 6 15:26:07 2023 -0600
ARTEMIS-4529 NPE with empty core message and STOMP consumer
---
.../core/protocol/stomp/StompConnection.java | 7 +++---
.../artemis/core/protocol/stomp/StompSession.java | 19 +++++----------
.../protocol/stomp/VersionedStompFrameHandler.java | 27 +++++++++-------------
.../protocol/stomp/v12/StompFrameHandlerV12.java | 6 ++---
.../artemis/tests/integration/stomp/StompTest.java | 16 +++++++++++++
5 files changed, 38 insertions(+), 37 deletions(-)
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index f1b54520ce..abb40f5082 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -622,11 +622,10 @@ public final class StompConnection extends
AbstractRemotingConnection {
return SERVER_NAME;
}
- public StompFrame createStompMessage(ICoreMessage serverMessage,
- ActiveMQBuffer bodyBuffer,
+ public StompFrame createStompMessage(ICoreMessage message,
StompSubscription subscription,
- int deliveryCount) throws Exception {
- return frameHandler.createMessageFrame(serverMessage, bodyBuffer,
subscription, deliveryCount);
+ int deliveryCount) {
+ return frameHandler.createMessageFrame(message, subscription,
deliveryCount);
}
public void addStompEventListener(FrameEventListener listener) {
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 1e370166d7..b39e51c094 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -153,29 +153,22 @@ public class StompSession implements SessionCallback {
}
@Override
- public int sendMessage(MessageReference ref,
- Message serverMessage,
- final ServerConsumer consumer,
- int deliveryCount) {
-
- ICoreMessage coreMessage = serverMessage.toCore();
-
- ICoreMessage newServerMessage = serverMessage.toCore();
+ public int sendMessage(MessageReference ref, Message serverMessage, final
ServerConsumer consumer, int deliveryCount) {
+ ICoreMessage message = ref.getMessage().toCore();
try {
StompSubscription subscription = subscriptions.get(consumer.getID());
// subscription might be null if the consumer was closed
if (subscription == null)
return 0;
StompFrame frame;
- ActiveMQBuffer buffer = coreMessage.getDataBuffer();
- frame = connection.createStompMessage(newServerMessage, buffer,
subscription, deliveryCount);
+ frame = connection.createStompMessage(message, subscription,
deliveryCount);
int length = frame.getEncodedSize();
if
(subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
if (manager.send(connection, frame)) {
- final long messageID = newServerMessage.getMessageID();
+ final long messageID = message.getMessageID();
final long consumerID = consumer.getID();
// this will be called after the delivery is complete
@@ -191,14 +184,14 @@ public class StompSession implements SessionCallback {
});
}
} else {
- messagesToAck.put(newServerMessage.getMessageID(), new
Pair<>(consumer.getID(), length));
+ messagesToAck.put(message.getMessageID(), new
Pair<>(consumer.getID(), length));
// Must send AFTER adding to messagesToAck - or could get acked
from client BEFORE it's been added!
manager.send(connection, frame);
}
return length;
} catch (Exception e) {
-
ActiveMQStompProtocolLogger.LOGGER.unableToSendMessageToClient(coreMessage, e);
+
ActiveMQStompProtocolLogger.LOGGER.unableToSendMessageToClient(message, e);
return 0;
}
}
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 2c64eb2fb6..e48fdcd2b6 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -320,31 +320,26 @@ public abstract class VersionedStompFrameHandler {
return response;
}
- public StompFrame createMessageFrame(ICoreMessage serverMessage,
- ActiveMQBuffer bodyBuffer,
- StompSubscription subscription,
- int deliveryCount) throws Exception {
+ public StompFrame createMessageFrame(ICoreMessage serverMessage,
StompSubscription subscription, int deliveryCount) {
StompFrame frame = createStompFrame(Stomp.Responses.MESSAGE);
if (subscription.getID() != null) {
frame.addHeader(Stomp.Headers.Message.SUBSCRIPTION,
subscription.getID());
}
- ActiveMQBuffer buffer = bodyBuffer != null ? bodyBuffer :
serverMessage.getReadOnlyBodyBuffer();
+ ActiveMQBuffer buffer = serverMessage.getReadOnlyBodyBuffer();
- int size = buffer.writerIndex();
+ byte[] data = new byte[buffer.writerIndex()];
- byte[] data = new byte[size];
-
- if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE) {
- frame.addHeader(Headers.CONTENT_LENGTH, String.valueOf(data.length));
- buffer.readBytes(data);
- } else {
- SimpleString text = buffer.readNullableSimpleString();
- if (text != null) {
- data = text.toString().getBytes(StandardCharsets.UTF_8);
+ if (data.length > 0) {
+ if (serverMessage.containsProperty(Stomp.Headers.CONTENT_LENGTH) ||
serverMessage.getType() == Message.BYTES_TYPE) {
+ frame.addHeader(Headers.CONTENT_LENGTH,
String.valueOf(data.length));
+ buffer.readBytes(data);
} else {
- data = new byte[0];
+ SimpleString text = buffer.readNullableSimpleString();
+ if (text != null) {
+ data = text.toString().getBytes(StandardCharsets.UTF_8);
+ }
}
}
frame.setByteBody(data);
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
index 77a92259f9..080628775c 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v12/StompFrameHandlerV12.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.stomp.v12;
import java.util.concurrent.ScheduledExecutorService;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompException;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@@ -50,10 +49,9 @@ public class StompFrameHandlerV12 extends
StompFrameHandlerV11 {
@Override
public StompFrame createMessageFrame(ICoreMessage serverMessage,
- ActiveMQBuffer bodyBuffer,
StompSubscription subscription,
- int deliveryCount) throws Exception {
- StompFrame frame = super.createMessageFrame(serverMessage, bodyBuffer,
subscription, deliveryCount);
+ int deliveryCount) {
+ StompFrame frame = super.createMessageFrame(serverMessage, subscription,
deliveryCount);
if
(!subscription.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
frame.addHeader(Stomp.Headers.Message.ACK,
String.valueOf(serverMessage.getMessageID()));
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 77bf5e9006..0ee8e63a35 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -344,6 +344,22 @@ public class StompTest extends StompTestBase {
}
+ @Test
+ public void sendEmptyCoreMessage() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+ // send core JMS message
+ MessageProducer mp =
session.createProducer(session.createQueue(getQueuePrefix() + getQueueName()));
+ Message m = session.createMessage();
+ mp.send(m);
+
+ // Receive STOMP Message
+ ClientStompFrame frame = conn.receiveFrame();
+ assertNotNull(frame);
+ assertNull(frame.getBody());
+ }
+
public void sendMessageToNonExistentQueue(String queuePrefix,
String queue,
RoutingType routingType) throws
Exception {