This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 1db3ae1 ARTEMIS-3529 Fixing integration tests after Web Console
Parsing of Large Messages
1db3ae1 is described below
commit 1db3ae1dc075ee6a126ebeeb57f52ac9c0423574
Author: Clebert Suconic <[email protected]>
AuthorDate: Tue Oct 19 22:27:26 2021 -0400
ARTEMIS-3529 Fixing integration tests after Web Console Parsing of Large
Messages
---
.../message/openmbean/MessageOpenTypeFactory.java | 6 ++-
.../protocol/amqp/broker/AMQPLargeMessage.java | 9 +++-
.../artemis/protocol/amqp/broker/AMQPMessage.java | 58 +++++++++++++++-------
.../tests/integration/amqp/JMXManagementTest.java | 22 +++++---
.../SimpleStreamingLargeMessageTest.java | 3 +-
5 files changed, 71 insertions(+), 27 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
index dee38d3..c66d332 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
@@ -141,7 +141,7 @@ public class MessageOpenTypeFactory<M extends Message> {
rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
}
- Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
+ Map<String, Object> propertyMap = expandProperties(m, valueSizeLimit);
rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" +
propertyMap, valueSizeLimit));
@@ -162,6 +162,10 @@ public class MessageOpenTypeFactory<M extends Message> {
return rc;
}
+ protected Map<String, Object> expandProperties(M m, int valueSizeLimit) {
+ return m.toPropertyMap(valueSizeLimit);
+ }
+
protected String toString(Object value) {
if (value == null) {
return null;
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
index 4f0c4d6..36b03ea 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java
@@ -602,8 +602,13 @@ public class AMQPLargeMessage extends AMQPMessage
implements LargeServerMessage
}
@Override
- public long getPersistentSize() throws ActiveMQException {
- return 0;
+ public long getPersistentSize() {
+ try {
+ return largeBody.getBodySize();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ return 0;
+ }
}
@Override
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 91a3adb..61682b1 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -834,6 +834,25 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
@Override
public Map<String, Object> toPropertyMap(int valueSizeLimit) {
+ return toPropertyMap(false, valueSizeLimit);
+ }
+
+ private Map<String, Object> toPropertyMap(boolean expandPropertyType, int
valueSizeLimit) {
+ String extraPropertiesPrefix;
+ String applicationPropertiesPrefix;
+ String annotationPrefix;
+ String propertiesPrefix;
+ if (expandPropertyType) {
+ extraPropertiesPrefix = "extraProperties.";
+ applicationPropertiesPrefix = "applicationProperties.";
+ annotationPrefix = "messageAnnotations.";
+ propertiesPrefix = "properties.";
+ } else {
+ extraPropertiesPrefix = "";
+ applicationPropertiesPrefix = "";
+ annotationPrefix = "";
+ propertiesPrefix = "";
+ }
Map map = new HashMap<>();
for (SimpleString name : getPropertyNames()) {
Object value = getObjectProperty(name.toString());
@@ -842,45 +861,45 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
value = ((Binary)value).getArray();
}
value = JsonUtil.truncate(value, valueSizeLimit);
- map.put("applicationProperties." + name, value);
+ map.put(applicationPropertiesPrefix + name, value);
}
TypedProperties extraProperties = getExtraProperties();
if (extraProperties != null) {
extraProperties.forEach((s, o) -> {
- map.put("extraProperties." + s.toString(),
JsonUtil.truncate(o.toString(), valueSizeLimit));
+ map.put(extraPropertiesPrefix + s.toString(),
JsonUtil.truncate(o.toString(), valueSizeLimit));
});
}
- addAnnotationsAsProperties(map, messageAnnotations);
+ addAnnotationsAsProperties(annotationPrefix, map, messageAnnotations);
if (properties != null) {
if (properties.getContentType() != null) {
- map.put("properties.contentType",
properties.getContentType().toString());
+ map.put(propertiesPrefix + "contentType",
properties.getContentType().toString());
}
if (properties.getContentEncoding() != null) {
- map.put("properties.contentEncoding",
properties.getContentEncoding().toString());
+ map.put(propertiesPrefix + "contentEncoding",
properties.getContentEncoding().toString());
}
if (properties.getGroupId() != null) {
- map.put("properties.groupID", properties.getGroupId());
+ map.put(propertiesPrefix + "groupID", properties.getGroupId());
}
if (properties.getGroupSequence() != null) {
- map.put("properties.groupSequence",
properties.getGroupSequence().intValue());
+ map.put(propertiesPrefix + "groupSequence",
properties.getGroupSequence().intValue());
}
if (properties.getReplyToGroupId() != null) {
- map.put("properties.replyToGroupId",
properties.getReplyToGroupId());
+ map.put(propertiesPrefix + "replyToGroupId",
properties.getReplyToGroupId());
}
if (properties.getCreationTime() != null) {
- map.put("properties.creationTime",
properties.getCreationTime().getTime());
+ map.put(propertiesPrefix + "creationTime",
properties.getCreationTime().getTime());
}
if (properties.getAbsoluteExpiryTime() != null) {
- map.put("properties.absoluteExpiryTime",
properties.getCreationTime().getTime());
+ map.put(propertiesPrefix + "absoluteExpiryTime",
properties.getCreationTime().getTime());
}
if (properties.getTo() != null) {
- map.put("properties.to", properties.getTo());
+ map.put(propertiesPrefix + "to", properties.getTo());
}
if (properties.getSubject() != null) {
- map.put("properties.subject", properties.getSubject());
+ map.put(propertiesPrefix + "subject", properties.getSubject());
}
}
@@ -888,21 +907,21 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
}
- protected static void addAnnotationsAsProperties(Map map,
MessageAnnotations annotations) {
+ protected static void addAnnotationsAsProperties(String prefix, Map map,
MessageAnnotations annotations) {
if (annotations != null && annotations.getValue() != null) {
for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
String key = entry.getKey().toString();
if ("x-opt-delivery-time".equals(key) && entry.getValue() != null)
{
long deliveryTime = ((Number) entry.getValue()).longValue();
- map.put("message-annotation.x-opt-delivery-time", deliveryTime);
+ map.put(prefix + "x-opt-delivery-time", deliveryTime);
} else if ("x-opt-delivery-delay".equals(key) && entry.getValue()
!= null) {
long delay = ((Number) entry.getValue()).longValue();
- map.put("message-annotation.x-opt-delivery-delay", delay);
+ map.put("x-opt-delivery-delay", delay);
} else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) &&
entry.getValue() != null) {
- map.put("message-annotation.X_OPT_INGRESS_TIME", ((Number)
entry.getValue()).longValue());
+ map.put("X_OPT_INGRESS_TIME", ((Number)
entry.getValue()).longValue());
} else {
try {
- map.put("message-annotation." + key, entry.getValue());
+ map.put(prefix + key, entry.getValue());
} catch (ActiveMQPropertyConversionException e) {
logger.warn(e.getMessage(), e);
}
@@ -1843,6 +1862,11 @@ public abstract class AMQPMessage extends
RefCountMessage implements org.apache.
return rc;
}
+ @Override
+ protected Map<String, Object> expandProperties(AMQPMessage m, int
valueSizeLimit) {
+ return m.toPropertyMap(true, valueSizeLimit);
+ }
+
private byte getType(AMQPMessage m, Properties properties) {
if (m.isLargeMessage()) {
return DEFAULT_TYPE;
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
index 848e190..f32153a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMXManagementTest.java
@@ -80,12 +80,22 @@ public class JMXManagementTest extends JMSClientTestSupport
{
}
//before commit
- assertEquals(num, queueControl.getDeliveringCount());
-
- Map<String, Map<String, Object>[]> result =
queueControl.listDeliveringMessages();
- assertEquals(1, result.size());
-
- Map<String, Object>[] msgMaps =
result.entrySet().iterator().next().getValue();
+ Wait.assertEquals(num, () -> queueControl.getDeliveringCount());
+
+ Map<String, Map<String, Object>[]> result = null;
+ Map<String, Object>[] msgMaps = null;
+ // we might need some retry, and Wait.assert won't be as efficient on
this case
+ for (int i = 0; i < 10; i++) {
+ result = queueControl.listDeliveringMessages();
+ assertEquals(1, result.size());
+
+ msgMaps = result.entrySet().iterator().next().getValue();
+ if (msgMaps.length == num) {
+ break;
+ } else {
+ Thread.sleep(100);
+ }
+ }
assertEquals(num, msgMaps.length);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
index 2af5653..dc5cf9a 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/largemessages/SimpleStreamingLargeMessageTest.java
@@ -359,7 +359,8 @@ public class SimpleStreamingLargeMessageTest extends
AmqpClientTestSupport {
assertEquals(1, browseResult.length);
if ((boolean) browseResult[0].get("largeMessage")) {
- assertTrue(browseResult[0].containsKey("text"));
+ // The AMQPMessage will part the body as text (...Large
Message...) while core will parse it differently
+ assertTrue(browseResult[0].containsKey("text") ||
browseResult[0].containsKey("BodyPreview"));
}
connection = client.createConnection();