This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new a833d95 ARTEMIS-3461 Generalize MBean Support on Messages and avoid
converstion to core on AMQP Messages on console browsing
a833d95 is described below
commit a833d95c1fb7fd5ab5a5220a64df80d06815da47
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Oct 14 13:25:56 2021 -0400
ARTEMIS-3461 Generalize MBean Support on Messages and avoid converstion to
core on AMQP Messages on console browsing
Done in collaboration with Erwin Dondorp through
https://github.com/apache/activemq-artemis/pull/3794/
---
.../apache/activemq/artemis/api/core/JsonUtil.java | 13 +-
.../apache/activemq/artemis/api/core/Message.java | 6 +
.../artemis/core/message/impl/CoreMessage.java | 95 +++++++
.../message}/openmbean/CompositeDataConstants.java | 2 +-
.../message/openmbean/MessageOpenTypeFactory.java | 224 +++++++++++++++
.../artemis/protocol/amqp/broker/AMQPMessage.java | 164 +++++++++++
.../core/management/impl/QueueControlImpl.java | 11 +-
.../impl/openmbean/CompositeDataConstants.java | 63 +----
.../management/impl/openmbean/OpenTypeSupport.java | 305 ---------------------
.../impl/openmbean/OpenTypeSupportTest.java | 3 +-
.../integration/management/QueueControlTest.java | 131 ++++++++-
11 files changed, 645 insertions(+), 372 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
index 58cac51..69dfa8a 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/JsonUtil.java
@@ -325,14 +325,19 @@ public final class JsonUtil {
private JsonUtil() {
}
+ public static String truncateString(final String str, final int
valueSizeLimit) {
+ if (str.length() > valueSizeLimit) {
+ return new StringBuilder(valueSizeLimit + 32).append(str.substring(0,
valueSizeLimit)).append(", + ").append(str.length() - valueSizeLimit).append("
more").toString();
+ } else {
+ return str;
+ }
+ }
+
public static Object truncate(final Object value, final int valueSizeLimit)
{
Object result = value;
if (valueSizeLimit >= 0) {
if (String.class.equals(value.getClass())) {
- String str = (String) value;
- if (str.length() > valueSizeLimit) {
- result = new StringBuilder(valueSizeLimit +
32).append(str.substring(0, valueSizeLimit)).append(", + ").append(str.length()
- valueSizeLimit).append(" more").toString();
- }
+ result = truncateString((String)value, valueSizeLimit);
} else if (value.getClass().isArray()) {
if (byte[].class.equals(value.getClass())) {
if (((byte[]) value).length > valueSizeLimit) {
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
index 5525fee..bef0fa4 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.api.core;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.OpenDataException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
@@ -769,6 +771,10 @@ public interface Message {
/** This should make you convert your message into Core format. */
ICoreMessage toCore();
+ default CompositeData toCompositeData(int fieldsLimit, int deliveryCount)
throws OpenDataException {
+ return null;
+ }
+
/** This should make you convert your message into Core format. */
ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools);
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
index 1129f55..1d5acda 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java
@@ -17,8 +17,15 @@
package org.apache.activemq.artemis.core.message.impl;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.SimpleType;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.zip.DataFormatException;
@@ -33,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import
org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -40,6 +48,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import
org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer;
import org.apache.activemq.artemis.core.message.LargeBodyReader;
+import
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
+import
org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@@ -1216,6 +1226,7 @@ public class CoreMessage extends RefCountMessage
implements ICoreMessage {
return this;
}
+
@Override
public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
return this;
@@ -1290,4 +1301,88 @@ public class CoreMessage extends RefCountMessage
implements ICoreMessage {
return body;
}
+
+
+ //
*******************************************************************************************************************************
+ // Composite Data implementation
+
+ private static MessageOpenTypeFactory TEXT_FACTORY = new
TextMessageOpenTypeFactory();
+ private static MessageOpenTypeFactory BYTES_FACTORY = new
BytesMessageOpenTypeFactory();
+
+
+ @Override
+ public CompositeData toCompositeData(int fieldsLimit, int deliveryCount)
throws OpenDataException {
+ CompositeType ct;
+ Map<String, Object> fields;
+ byte type = getType();
+ switch (type) {
+ case Message.TEXT_TYPE:
+ ct = TEXT_FACTORY.getCompositeType();
+ fields = TEXT_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+ break;
+ default:
+ ct = BYTES_FACTORY.getCompositeType();
+ fields = BYTES_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+ break;
+ }
+ return new CompositeDataSupport(ct, fields);
+
+ }
+
+ static class BytesMessageOpenTypeFactory extends
MessageOpenTypeFactory<CoreMessage> {
+ protected ArrayType body;
+
+ @Override
+ protected void init() throws OpenDataException {
+ super.init();
+ body = new ArrayType(SimpleType.BYTE, true);
+ addItem(CompositeDataConstants.TYPE,
CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+ addItem(CompositeDataConstants.BODY,
CompositeDataConstants.BODY_DESCRIPTION, body);
+ }
+
+ @Override
+ public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit,
int delivery) throws OpenDataException {
+ Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+ rc.put(CompositeDataConstants.TYPE, m.getType());
+ if (!m.isLargeMessage()) {
+ ActiveMQBuffer bodyCopy = m.toCore().getReadOnlyBodyBuffer();
+ byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit
? bodyCopy.readableBytes() : valueSizeLimit + 1];
+ bodyCopy.readBytes(bytes);
+ rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes,
valueSizeLimit));
+ } else {
+ rc.put(CompositeDataConstants.BODY, new byte[0]);
+ }
+ return rc;
+ }
+ }
+
+ static class TextMessageOpenTypeFactory extends
MessageOpenTypeFactory<CoreMessage> {
+ @Override
+ protected void init() throws OpenDataException {
+ super.init();
+ addItem(CompositeDataConstants.TYPE,
CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+ addItem(CompositeDataConstants.TEXT_BODY,
CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+ }
+
+ @Override
+ public Map<String, Object> getFields(CoreMessage m, int valueSizeLimit,
int delivery) throws OpenDataException {
+ Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+ rc.put(CompositeDataConstants.TYPE, m.getType());
+ if (!m.isLargeMessage()) {
+ if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
+ rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
+ } else {
+ SimpleString text =
m.toCore().getReadOnlyBodyBuffer().readNullableSimpleString();
+ rc.put(CompositeDataConstants.TEXT_BODY, text != null ?
JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
+ }
+ } else {
+ rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
+ }
+ return rc;
+ }
+ }
+
+ // Composite Data implementation
+ //
*******************************************************************************************************************************
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java
similarity index 97%
copy from
artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
copy to
artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java
index a8cde03..84fbbfd 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/CompositeDataConstants.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.management.impl.openmbean;
+package org.apache.activemq.artemis.core.message.openmbean;
public interface CompositeDataConstants {
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
new file mode 100644
index 0000000..0028051
--- /dev/null
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/openmbean/MessageOpenTypeFactory.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.message.openmbean;
+
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.jboss.logging.Logger;
+
+public class MessageOpenTypeFactory<M extends Message> {
+
+ private static final Logger logger =
Logger.getLogger(MessageOpenTypeFactory.class);
+
+ public MessageOpenTypeFactory() {
+ try {
+ init();
+ compositeType = createCompositeType();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+
+ private CompositeType compositeType;
+ private final List<String> itemNamesList = new ArrayList<>();
+ private final List<String> itemDescriptionsList = new ArrayList<>();
+ private final List<OpenType> itemTypesList = new ArrayList<>();
+
+ protected TabularType stringPropertyTabularType;
+ protected TabularType booleanPropertyTabularType;
+ protected TabularType bytePropertyTabularType;
+ protected TabularType shortPropertyTabularType;
+ protected TabularType intPropertyTabularType;
+ protected TabularType longPropertyTabularType;
+ protected TabularType floatPropertyTabularType;
+ protected TabularType doublePropertyTabularType;
+ protected Object[][] typedPropertyFields;
+
+ protected String getTypeName() {
+ return Message.class.getName();
+ }
+
+ public CompositeType getCompositeType() throws OpenDataException {
+ return compositeType;
+ }
+
+ protected void init() throws OpenDataException {
+
+ addItem(CompositeDataConstants.ADDRESS,
CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING);
+ addItem(CompositeDataConstants.MESSAGE_ID,
CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
+ addItem(CompositeDataConstants.USER_ID,
CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING);
+ addItem(CompositeDataConstants.DURABLE,
CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN);
+ addItem(CompositeDataConstants.EXPIRATION,
CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG);
+ addItem(CompositeDataConstants.PRIORITY,
CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE);
+ addItem(CompositeDataConstants.REDELIVERED,
CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
+ addItem(CompositeDataConstants.TIMESTAMP,
CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG);
+ addItem(CompositeDataConstants.LARGE_MESSAGE,
CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN);
+ addItem(CompositeDataConstants.PERSISTENT_SIZE,
CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG);
+
+ addItem(CompositeDataConstants.PROPERTIES,
CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
+
+ // now lets expose the type safe properties
+ stringPropertyTabularType = createTabularType(String.class,
SimpleType.STRING);
+ booleanPropertyTabularType = createTabularType(Boolean.class,
SimpleType.BOOLEAN);
+ bytePropertyTabularType = createTabularType(Byte.class, SimpleType.BYTE);
+ shortPropertyTabularType = createTabularType(Short.class,
SimpleType.SHORT);
+ intPropertyTabularType = createTabularType(Integer.class,
SimpleType.INTEGER);
+ longPropertyTabularType = createTabularType(Long.class, SimpleType.LONG);
+ floatPropertyTabularType = createTabularType(Float.class,
SimpleType.FLOAT);
+ doublePropertyTabularType = createTabularType(Double.class,
SimpleType.DOUBLE);
+
+ addItem(CompositeDataConstants.STRING_PROPERTIES,
CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION,
stringPropertyTabularType);
+ addItem(CompositeDataConstants.BOOLEAN_PROPERTIES,
CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION,
booleanPropertyTabularType);
+ addItem(CompositeDataConstants.BYTE_PROPERTIES,
CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
+ addItem(CompositeDataConstants.SHORT_PROPERTIES,
CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
+ addItem(CompositeDataConstants.INT_PROPERTIES,
CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
+ addItem(CompositeDataConstants.LONG_PROPERTIES,
CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
+ addItem(CompositeDataConstants.FLOAT_PROPERTIES,
CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
+ addItem(CompositeDataConstants.DOUBLE_PROPERTIES,
CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION,
doublePropertyTabularType);
+
+ typedPropertyFields = new Object[][] {
+ {CompositeDataConstants.STRING_PROPERTIES, stringPropertyTabularType,
String.class},
+ {CompositeDataConstants.BOOLEAN_PROPERTIES,
booleanPropertyTabularType, Boolean.class},
+ {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType,
Byte.class},
+ {CompositeDataConstants.SHORT_PROPERTIES, shortPropertyTabularType,
Short.class},
+ {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType,
Integer.class},
+ {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType,
Long.class},
+ {CompositeDataConstants.FLOAT_PROPERTIES, floatPropertyTabularType,
Float.class},
+ {CompositeDataConstants.DOUBLE_PROPERTIES, doublePropertyTabularType,
Double.class}
+ };
+
+ }
+
+ public Map<String, Object> getFields(M m, int valueSizeLimit, int
deliveryCount) throws OpenDataException {
+ Map<String, Object> rc = new HashMap<>();
+ rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
+ if (m.getUserID() != null) {
+ rc.put(CompositeDataConstants.USER_ID, "ID:" +
m.getUserID().toString());
+ } else {
+ rc.put(CompositeDataConstants.USER_ID, "");
+ }
+ rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" :
m.getAddress().toString());
+ rc.put(CompositeDataConstants.DURABLE, m.isDurable());
+ rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
+ rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp());
+ rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
+ rc.put(CompositeDataConstants.REDELIVERED, deliveryCount > 1);
+ rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage());
+ try {
+ rc.put(CompositeDataConstants.PERSISTENT_SIZE, m.getPersistentSize());
+ } catch (final ActiveMQException e1) {
+ rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
+ }
+
+ Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
+
+ rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" +
propertyMap, valueSizeLimit));
+
+ // only populate if there are some values
+ TabularDataSupport tabularData;
+ for (Object[] typedPropertyInfo : typedPropertyFields) {
+ tabularData = null;
+ try {
+ tabularData = createTabularData(propertyMap, (TabularType)
typedPropertyInfo[1], (Class) typedPropertyInfo[2]);
+ } catch (Exception ignored) {
+ }
+ if (tabularData != null && !tabularData.isEmpty()) {
+ rc.put((String) typedPropertyInfo[0], tabularData);
+ } else {
+ rc.put((String) typedPropertyInfo[0], null);
+ }
+ }
+ return rc;
+ }
+
+ protected String toString(Object value) {
+ if (value == null) {
+ return null;
+ }
+ return value.toString();
+ }
+
+ protected CompositeType createCompositeType() throws OpenDataException {
+ String[] itemNames = itemNamesList.toArray(new
String[itemNamesList.size()]);
+ String[] itemDescriptions = itemDescriptionsList.toArray(new
String[itemDescriptionsList.size()]);
+ OpenType[] itemTypes = itemTypesList.toArray(new
OpenType[itemTypesList.size()]);
+ return new CompositeType(getTypeName(), getDescription(), itemNames,
itemDescriptions, itemTypes);
+ }
+
+ protected String getDescription() {
+ return getTypeName();
+ }
+
+ protected <T> TabularType createTabularType(Class<T> type, OpenType
openType) throws OpenDataException {
+ String typeName = "java.util.Map<java.lang.String, " + type.getName() +
">";
+ String[] keyValue = new String[]{"key", "value"};
+ OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
+ CompositeType rowType = new CompositeType(typeName, typeName, keyValue,
keyValue, openTypes);
+ return new TabularType(typeName, typeName, rowType, new String[]{"key"});
+ }
+
+ protected TabularDataSupport createTabularData(Map<String, Object> entries,
+ TabularType type,
+ Class valueType) throws
IOException, OpenDataException {
+ TabularDataSupport answer = new TabularDataSupport(type);
+
+ for (String key : entries.keySet()) {
+ Object value = entries.get(key);
+ if (valueType.isInstance(value)) {
+ CompositeDataSupport compositeData = createTabularRowValue(type,
key, value);
+ answer.put(compositeData);
+ } else if (valueType == String.class && value instanceof
SimpleString) {
+ CompositeDataSupport compositeData = createTabularRowValue(type,
key, value.toString());
+ answer.put(compositeData);
+ }
+ }
+ return answer;
+ }
+
+ protected CompositeDataSupport createTabularRowValue(TabularType type,
+ String key,
+ Object value) throws
OpenDataException {
+ Map<String, Object> fields = new HashMap<>();
+ fields.put("key", key);
+ fields.put("value", value);
+ return new CompositeDataSupport(type.getRowType(), fields);
+ }
+
+ protected void addItem(String name, String description, OpenType type) {
+ itemNamesList.add(name);
+ itemDescriptionsList.add(description);
+ itemTypesList.add(type);
+ }
+}
+
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
index 3926fa5..0609a76 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java
@@ -16,11 +16,17 @@
*/
package org.apache.activemq.artemis.protocol.amqp.broker;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.SimpleType;
+import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,6 +40,8 @@ import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants;
+import
org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.persistence.Persister;
@@ -72,6 +80,8 @@ import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.jboss.logging.Logger;
+import static
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport.getCharsetForTextualContent;
+
/**
* See <a
href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format">AMQP
v1.0 message format</a>
* <pre>
@@ -834,9 +844,64 @@ public abstract class AMQPMessage extends RefCountMessage
implements org.apache.
value = JsonUtil.truncate(value, valueSizeLimit);
map.put(name.toString(), value);
}
+
+ TypedProperties extraProperties = getExtraProperties();
+ if (extraProperties != null) {
+ extraProperties.forEach((s, o) -> {
+ map.put(s.toString(), JsonUtil.truncate(o.toString(),
valueSizeLimit));
+ });
+ }
+ if (!isLargeMessage()) {
+ addAnnotationsAsProperties(map, messageAnnotations);
+ }
+
+ if (properties != null) {
+ if (properties.getContentType() != null) {
+ map.put("properties.getContentType()",
properties.getContentType().toString());
+ }
+ if (properties.getContentEncoding() != null) {
+ map.put("properties.getContentEncoding()",
properties.getContentEncoding().toString());
+ }
+ if (properties.getGroupId() != null) {
+ map.put("properties.getGroupID()", properties.getGroupId());
+ }
+ if (properties.getGroupSequence() != null) {
+ map.put("properties.getGroupSequence()",
properties.getGroupSequence().intValue());
+ }
+ if (properties.getReplyToGroupId() != null) {
+ map.put("properties.getReplyToGroupId()",
properties.getReplyToGroupId());
+ }
+ }
+
return map;
}
+
+ protected static void addAnnotationsAsProperties(Map map,
MessageAnnotations annotations) {
+ if (annotations != null && annotations.getValue() != null) {
+ for (Map.Entry<?, ?> entry : annotations.getValue().entrySet()) {
+ String key = entry.getKey().toString();
+ if ("x-opt-delivery-time".equals(key) && entry.getValue() != null)
{
+ long deliveryTime = ((Number) entry.getValue()).longValue();
+ map.put("annotation x-opt-delivery-time", deliveryTime);
+ } else if ("x-opt-delivery-delay".equals(key) && entry.getValue()
!= null) {
+ long delay = ((Number) entry.getValue()).longValue();
+ if (delay > 0) {
+ map.put("annotation x-opt-delivery-delay",
System.currentTimeMillis() + delay);
+ }
+ } else if (AMQPMessageSupport.X_OPT_INGRESS_TIME.equals(key) &&
entry.getValue() != null) {
+ map.put("annotation X_OPT_INGRESS_TIME", ((Number)
entry.getValue()).longValue());
+ } else {
+ try {
+ map.put("annotation " + key, entry.getValue());
+ } catch (ActiveMQPropertyConversionException e) {
+ }
+ }
+ }
+ }
+ }
+
+
@Override
public ICoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
try {
@@ -1726,4 +1791,103 @@ public abstract class AMQPMessage extends
RefCountMessage implements org.apache.
public void setOwner(Object object) {
this.owner = object;
}
+
+
+ //
*******************************************************************************************************************************
+ // Composite Data implementation
+
+ private static MessageOpenTypeFactory AMQP_FACTORY = new
AmqpMessageOpenTypeFactory();
+
+ static class AmqpMessageOpenTypeFactory extends
MessageOpenTypeFactory<AMQPMessage> {
+ @Override
+ protected void init() throws OpenDataException {
+ super.init();
+ addItem(CompositeDataConstants.TEXT_BODY,
CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
+ addItem(CompositeDataConstants.TYPE,
CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
+ }
+
+ @Override
+ public Map<String, Object> getFields(AMQPMessage m, int valueSizeLimit,
int delivery) throws OpenDataException {
+ Map<String, Object> rc = super.getFields(m, valueSizeLimit, delivery);
+
+ if (!m.isLargeMessage()) {
+ m.ensureScanning();
+ }
+
+ Properties properties = m.getCurrentProperties();
+
+ byte type = getType(m, properties);
+
+ rc.put(CompositeDataConstants.TYPE, type);
+
+ if (m.isLargeMessage()) {
+ rc.put(CompositeDataConstants.TEXT_BODY, "... Large message ...");
+ } else {
+ if (m.getBody() instanceof AmqpValue) {
+ Object amqpValue = ((AmqpValue) m.getBody()).getValue();
+
+ rc.put(CompositeDataConstants.TEXT_BODY,
JsonUtil.truncateString(amqpValue.toString(), valueSizeLimit));
+ } else {
+ rc.put(CompositeDataConstants.TEXT_BODY,
JsonUtil.truncateString("" + m.getBody(), valueSizeLimit));
+ }
+ }
+
+ return rc;
+ }
+
+ private byte getType(AMQPMessage m, Properties properties) {
+ if (m.isLargeMessage()) {
+ return DEFAULT_TYPE;
+ }
+ byte type = BYTES_TYPE;
+
+ final Symbol contentType = properties != null ?
properties.getContentType() : null;
+ final String contentTypeString = contentType != null ?
contentType.toString() : null;
+
+ if (m.getBody() instanceof Data) {
+
+ if
(contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+ type = OBJECT_TYPE;
+ } else if
(contentType.equals(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE)) {
+ type = BYTES_TYPE;
+ } else {
+ Charset charset =
getCharsetForTextualContent(contentTypeString);
+ if (StandardCharsets.UTF_8.equals(charset)) {
+ type = TEXT_TYPE;
+ }
+ }
+ } else if (m.getBody() instanceof AmqpSequence) {
+ type = STREAM_TYPE;
+ } else if (m.getBody() instanceof AmqpValue) {
+ Object value = ((AmqpValue) m.getBody()).getValue();
+
+ if (value instanceof String) {
+ type = TEXT_TYPE;
+ } else if (value instanceof Binary) {
+
+ if
(contentType.equals(AMQPMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE)) {
+ type = OBJECT_TYPE;
+ } else {
+ type = BYTES_TYPE;
+ }
+ } else if (value instanceof List) {
+ type = STREAM_TYPE;
+ } else if (value instanceof Map) {
+ type = MAP_TYPE;
+ }
+ }
+ return type;
+ }
+ }
+
+ @Override
+ public CompositeData toCompositeData(int fieldsLimit, int deliveryCount)
throws OpenDataException {
+ Map<String, Object> fields;
+ fields = AMQP_FACTORY.getFields(this, fieldsLimit, deliveryCount);
+ return new CompositeDataSupport(AMQP_FACTORY.getCompositeType(), fields);
+ }
+
+ // Composite Data implementation
+ //
*******************************************************************************************************************************
+
}
\ No newline at end of file
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 5643a78..fbb4d78 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -39,7 +39,6 @@ import
org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
-import
org.apache.activemq.artemis.core.management.impl.openmbean.OpenTypeSupport;
import org.apache.activemq.artemis.core.messagecounter.MessageCounter;
import
org.apache.activemq.artemis.core.messagecounter.impl.MessageCounterHelper;
import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -61,9 +60,12 @@ import
org.apache.activemq.artemis.selector.filter.Filterable;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.jboss.logging.Logger;
public class QueueControlImpl extends AbstractControl implements QueueControl {
+ private static final Logger logger =
Logger.getLogger(QueueControlImpl.class);
+
public static final int FLUSH_LIMIT = 500;
// Constants -----------------------------------------------------
@@ -1583,7 +1585,7 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
if (index >= start) {
- c.add(OpenTypeSupport.convert(ref,
attributeSizeLimit));
+
c.add(ref.getMessage().toCompositeData(attributeSizeLimit,
ref.getDeliveryCount()));
}
//we only increase the index if we add a message,
otherwise we could stop before we get to a filtered message
index++;
@@ -1600,7 +1602,8 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
}
return rc;
}
- } catch (ActiveMQException e) {
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
if (AuditLogger.isResourceLoggingEnabled()) {
AuditLogger.browseMessagesFailure(queue.getName().toString());
}
@@ -1635,7 +1638,7 @@ public class QueueControlImpl extends AbstractControl
implements QueueControl {
while (iterator.hasNext() && currentPageSize++ < limit) {
MessageReference ref = iterator.next();
if (thefilter == null || thefilter.match(ref.getMessage())) {
- c.add(OpenTypeSupport.convert(ref, attributeSizeLimit));
+
c.add(ref.getMessage().toCompositeData(attributeSizeLimit,
ref.getDeliveryCount()));
}
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
index a8cde03..2970c0c 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/CompositeDataConstants.java
@@ -1,12 +1,12 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,56 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.activemq.artemis.core.management.impl.openmbean;
-
-public interface CompositeDataConstants {
-
- String ADDRESS = "address";
- String MESSAGE_ID = "messageID";
- String USER_ID = "userID";
- String TYPE = "type";
- String DURABLE = "durable";
- String EXPIRATION = "expiration";
- String PRIORITY = "priority";
- String REDELIVERED = "redelivered";
- String TIMESTAMP = "timestamp";
- String BODY = "BodyPreview";
- String TEXT_BODY = "text";
- String LARGE_MESSAGE = "largeMessage";
- String PERSISTENT_SIZE = "persistentSize";
- String PROPERTIES = "PropertiesText";
- String ADDRESS_DESCRIPTION = "The Address";
- String MESSAGE_ID_DESCRIPTION = "The message ID";
- String USER_ID_DESCRIPTION = "The user ID";
- String TYPE_DESCRIPTION = "The message type";
- String DURABLE_DESCRIPTION = "Is the message durable";
- String EXPIRATION_DESCRIPTION = "The message expiration";
- String PRIORITY_DESCRIPTION = "The message priority";
- String REDELIVERED_DESCRIPTION = "Has the message been redelivered";
- String TIMESTAMP_DESCRIPTION = "The message timestamp";
- String BODY_DESCRIPTION = "The message body";
- String LARGE_MESSAGE_DESCRIPTION = "Is the message treated as a large
message";
- String PERSISTENT_SIZE_DESCRIPTION = "The message size when persisted on
disk";
- String PROPERTIES_DESCRIPTION = "The properties text";
-
- // User properties
- String STRING_PROPERTIES = "StringProperties";
- String BOOLEAN_PROPERTIES = "BooleanProperties";
- String BYTE_PROPERTIES = "ByteProperties";
- String SHORT_PROPERTIES = "ShortProperties";
- String INT_PROPERTIES = "IntProperties";
- String LONG_PROPERTIES = "LongProperties";
- String FLOAT_PROPERTIES = "FloatProperties";
- String DOUBLE_PROPERTIES = "DoubleProperties";
+package org.apache.activemq.artemis.core.management.impl.openmbean;
- String STRING_PROPERTIES_DESCRIPTION = "User String Properties";
- String BOOLEAN_PROPERTIES_DESCRIPTION = "User Boolean Properties";
- String BYTE_PROPERTIES_DESCRIPTION = "User Byte Properties";
- String SHORT_PROPERTIES_DESCRIPTION = "User Short Properties";
- String INT_PROPERTIES_DESCRIPTION = "User Int Properties";
- String LONG_PROPERTIES_DESCRIPTION = "User Long Properties";
- String FLOAT_PROPERTIES_DESCRIPTION = "User Float Properties";
- String DOUBLE_PROPERTIES_DESCRIPTION = "User Double Properties";
+/**
+ * @deprecated use
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants
+ */
+@Deprecated
+public interface CompositeDataConstants extends
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants {
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
deleted file mode 100644
index 21e9437..0000000
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupport.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.management.impl.openmbean;
-
-import javax.management.openmbean.ArrayType;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.CompositeDataSupport;
-import javax.management.openmbean.CompositeType;
-import javax.management.openmbean.OpenDataException;
-import javax.management.openmbean.OpenType;
-import javax.management.openmbean.SimpleType;
-import javax.management.openmbean.TabularDataSupport;
-import javax.management.openmbean.TabularType;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.api.core.JsonUtil;
-import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.MessageReference;
-
-public final class OpenTypeSupport {
-
- private static MessageOpenTypeFactory TEXT_FACTORY = new
TextMessageOpenTypeFactory();
- private static MessageOpenTypeFactory BYTES_FACTORY = new
BytesMessageOpenTypeFactory();
-
- private OpenTypeSupport() {
- }
-
- public static CompositeData convert(MessageReference ref, int
valueSizeLimit) throws OpenDataException {
- CompositeType ct;
-
- ICoreMessage message = ref.getMessage().toCore();
-
- Map<String, Object> fields;
- byte type = message.getType();
-
- switch(type) {
- case Message.TEXT_TYPE:
- ct = TEXT_FACTORY.getCompositeType();
- fields = TEXT_FACTORY.getFields(ref, valueSizeLimit);
- break;
- default:
- ct = BYTES_FACTORY.getCompositeType();
- fields = BYTES_FACTORY.getFields(ref, valueSizeLimit);
- break;
- }
- return new CompositeDataSupport(ct, fields);
- }
-
- static class MessageOpenTypeFactory {
-
- private CompositeType compositeType;
- private final List<String> itemNamesList = new ArrayList<>();
- private final List<String> itemDescriptionsList = new ArrayList<>();
- private final List<OpenType> itemTypesList = new ArrayList<>();
-
- protected TabularType stringPropertyTabularType;
- protected TabularType booleanPropertyTabularType;
- protected TabularType bytePropertyTabularType;
- protected TabularType shortPropertyTabularType;
- protected TabularType intPropertyTabularType;
- protected TabularType longPropertyTabularType;
- protected TabularType floatPropertyTabularType;
- protected TabularType doublePropertyTabularType;
- protected Object[][] typedPropertyFields;
-
- protected String getTypeName() {
- return Message.class.getName();
- }
-
- public CompositeType getCompositeType() throws OpenDataException {
- if (compositeType == null) {
- init();
- compositeType = createCompositeType();
- }
- return compositeType;
- }
-
- protected void init() throws OpenDataException {
-
- addItem(CompositeDataConstants.ADDRESS,
CompositeDataConstants.ADDRESS_DESCRIPTION, SimpleType.STRING);
- addItem(CompositeDataConstants.MESSAGE_ID,
CompositeDataConstants.MESSAGE_ID_DESCRIPTION, SimpleType.STRING);
- addItem(CompositeDataConstants.USER_ID,
CompositeDataConstants.USER_ID_DESCRIPTION, SimpleType.STRING);
- addItem(CompositeDataConstants.TYPE,
CompositeDataConstants.TYPE_DESCRIPTION, SimpleType.BYTE);
- addItem(CompositeDataConstants.DURABLE,
CompositeDataConstants.DURABLE_DESCRIPTION, SimpleType.BOOLEAN);
- addItem(CompositeDataConstants.EXPIRATION,
CompositeDataConstants.EXPIRATION_DESCRIPTION, SimpleType.LONG);
- addItem(CompositeDataConstants.PRIORITY,
CompositeDataConstants.PRIORITY_DESCRIPTION, SimpleType.BYTE);
- addItem(CompositeDataConstants.REDELIVERED,
CompositeDataConstants.REDELIVERED_DESCRIPTION, SimpleType.BOOLEAN);
- addItem(CompositeDataConstants.TIMESTAMP,
CompositeDataConstants.TIMESTAMP_DESCRIPTION, SimpleType.LONG);
- addItem(CompositeDataConstants.LARGE_MESSAGE,
CompositeDataConstants.LARGE_MESSAGE_DESCRIPTION, SimpleType.BOOLEAN);
- addItem(CompositeDataConstants.PERSISTENT_SIZE,
CompositeDataConstants.PERSISTENT_SIZE_DESCRIPTION, SimpleType.LONG);
-
- addItem(CompositeDataConstants.PROPERTIES,
CompositeDataConstants.PROPERTIES_DESCRIPTION, SimpleType.STRING);
-
- // now lets expose the type safe properties
- stringPropertyTabularType = createTabularType(String.class,
SimpleType.STRING);
- booleanPropertyTabularType = createTabularType(Boolean.class,
SimpleType.BOOLEAN);
- bytePropertyTabularType = createTabularType(Byte.class,
SimpleType.BYTE);
- shortPropertyTabularType = createTabularType(Short.class,
SimpleType.SHORT);
- intPropertyTabularType = createTabularType(Integer.class,
SimpleType.INTEGER);
- longPropertyTabularType = createTabularType(Long.class,
SimpleType.LONG);
- floatPropertyTabularType = createTabularType(Float.class,
SimpleType.FLOAT);
- doublePropertyTabularType = createTabularType(Double.class,
SimpleType.DOUBLE);
-
- addItem(CompositeDataConstants.STRING_PROPERTIES,
CompositeDataConstants.STRING_PROPERTIES_DESCRIPTION,
stringPropertyTabularType);
- addItem(CompositeDataConstants.BOOLEAN_PROPERTIES,
CompositeDataConstants.BOOLEAN_PROPERTIES_DESCRIPTION,
booleanPropertyTabularType);
- addItem(CompositeDataConstants.BYTE_PROPERTIES,
CompositeDataConstants.BYTE_PROPERTIES_DESCRIPTION, bytePropertyTabularType);
- addItem(CompositeDataConstants.SHORT_PROPERTIES,
CompositeDataConstants.SHORT_PROPERTIES_DESCRIPTION, shortPropertyTabularType);
- addItem(CompositeDataConstants.INT_PROPERTIES,
CompositeDataConstants.INT_PROPERTIES_DESCRIPTION, intPropertyTabularType);
- addItem(CompositeDataConstants.LONG_PROPERTIES,
CompositeDataConstants.LONG_PROPERTIES_DESCRIPTION, longPropertyTabularType);
- addItem(CompositeDataConstants.FLOAT_PROPERTIES,
CompositeDataConstants.FLOAT_PROPERTIES_DESCRIPTION, floatPropertyTabularType);
- addItem(CompositeDataConstants.DOUBLE_PROPERTIES,
CompositeDataConstants.DOUBLE_PROPERTIES_DESCRIPTION,
doublePropertyTabularType);
-
- typedPropertyFields = new Object[][] {
- {CompositeDataConstants.STRING_PROPERTIES,
stringPropertyTabularType, String.class},
- {CompositeDataConstants.BOOLEAN_PROPERTIES,
booleanPropertyTabularType, Boolean.class},
- {CompositeDataConstants.BYTE_PROPERTIES, bytePropertyTabularType,
Byte.class},
- {CompositeDataConstants.SHORT_PROPERTIES,
shortPropertyTabularType, Short.class},
- {CompositeDataConstants.INT_PROPERTIES, intPropertyTabularType,
Integer.class},
- {CompositeDataConstants.LONG_PROPERTIES, longPropertyTabularType,
Long.class},
- {CompositeDataConstants.FLOAT_PROPERTIES,
floatPropertyTabularType, Float.class},
- {CompositeDataConstants.DOUBLE_PROPERTIES,
doublePropertyTabularType, Double.class}
- };
-
- }
-
- public Map<String, Object> getFields(MessageReference ref, int
valueSizeLimit) throws OpenDataException {
- Map<String, Object> rc = new HashMap<>();
- ICoreMessage m = ref.getMessage().toCore();
- rc.put(CompositeDataConstants.MESSAGE_ID, "" + m.getMessageID());
- if (m.getUserID() != null) {
- rc.put(CompositeDataConstants.USER_ID, "ID:" +
m.getUserID().toString());
- } else {
- rc.put(CompositeDataConstants.USER_ID, "");
- }
- rc.put(CompositeDataConstants.ADDRESS, m.getAddress() == null ? "" :
m.getAddress().toString());
- rc.put(CompositeDataConstants.TYPE, m.getType());
- rc.put(CompositeDataConstants.DURABLE, m.isDurable());
- rc.put(CompositeDataConstants.EXPIRATION, m.getExpiration());
- rc.put(CompositeDataConstants.TIMESTAMP, m.getTimestamp());
- rc.put(CompositeDataConstants.PRIORITY, m.getPriority());
- rc.put(CompositeDataConstants.REDELIVERED, ref.getDeliveryCount() >
1);
- rc.put(CompositeDataConstants.LARGE_MESSAGE, m.isLargeMessage());
- try {
- rc.put(CompositeDataConstants.PERSISTENT_SIZE,
m.getPersistentSize());
- } catch (final ActiveMQException e1) {
- rc.put(CompositeDataConstants.PERSISTENT_SIZE, -1);
- }
-
- Map<String, Object> propertyMap = m.toPropertyMap(valueSizeLimit);
-
- rc.put(CompositeDataConstants.PROPERTIES, JsonUtil.truncate("" +
propertyMap, valueSizeLimit));
-
- // only populate if there are some values
- TabularDataSupport tabularData;
- for (Object[] typedPropertyInfo : typedPropertyFields) {
- tabularData = null;
- try {
- tabularData = createTabularData(propertyMap, (TabularType)
typedPropertyInfo[1], (Class) typedPropertyInfo[2]);
- } catch (Exception ignored) {
- }
- if (tabularData != null && !tabularData.isEmpty()) {
- rc.put((String) typedPropertyInfo[0], tabularData);
- } else {
- rc.put((String) typedPropertyInfo[0], null);
- }
- }
- return rc;
- }
-
- protected String toString(Object value) {
- if (value == null) {
- return null;
- }
- return value.toString();
- }
-
- protected CompositeType createCompositeType() throws OpenDataException {
- String[] itemNames = itemNamesList.toArray(new
String[itemNamesList.size()]);
- String[] itemDescriptions = itemDescriptionsList.toArray(new
String[itemDescriptionsList.size()]);
- OpenType[] itemTypes = itemTypesList.toArray(new
OpenType[itemTypesList.size()]);
- return new CompositeType(getTypeName(), getDescription(), itemNames,
itemDescriptions, itemTypes);
- }
-
- protected String getDescription() {
- return getTypeName();
- }
-
- protected <T> TabularType createTabularType(Class<T> type, OpenType
openType) throws OpenDataException {
- String typeName = "java.util.Map<java.lang.String, " + type.getName()
+ ">";
- String[] keyValue = new String[]{"key", "value"};
- OpenType[] openTypes = new OpenType[]{SimpleType.STRING, openType};
- CompositeType rowType = new CompositeType(typeName, typeName,
keyValue, keyValue, openTypes);
- return new TabularType(typeName, typeName, rowType, new
String[]{"key"});
- }
-
- protected TabularDataSupport createTabularData(Map<String, Object>
entries,
- TabularType type,
- Class valueType) throws
IOException, OpenDataException {
- TabularDataSupport answer = new TabularDataSupport(type);
-
- for (String key : entries.keySet()) {
- Object value = entries.get(key);
- if (valueType.isInstance(value)) {
- CompositeDataSupport compositeData =
createTabularRowValue(type, key, value);
- answer.put(compositeData);
- } else if (valueType == String.class && value instanceof
SimpleString) {
- CompositeDataSupport compositeData =
createTabularRowValue(type, key, value.toString());
- answer.put(compositeData);
- }
- }
- return answer;
- }
-
- protected CompositeDataSupport createTabularRowValue(TabularType type,
- String key,
- Object value)
throws OpenDataException {
- Map<String, Object> fields = new HashMap<>();
- fields.put("key", key);
- fields.put("value", value);
- return new CompositeDataSupport(type.getRowType(), fields);
- }
-
- protected void addItem(String name, String description, OpenType type) {
- itemNamesList.add(name);
- itemDescriptionsList.add(description);
- itemTypesList.add(type);
- }
- }
-
-
- static class BytesMessageOpenTypeFactory extends MessageOpenTypeFactory {
- protected ArrayType body;
-
- @Override
- protected void init() throws OpenDataException {
- super.init();
- body = new ArrayType(SimpleType.BYTE, true);
- addItem(CompositeDataConstants.BODY,
CompositeDataConstants.BODY_DESCRIPTION, body);
- }
-
- @Override
- public Map<String, Object> getFields(MessageReference ref, int
valueSizeLimit) throws OpenDataException {
- Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
- ICoreMessage m = ref.getMessage().toCore();
- if (!m.isLargeMessage()) {
- ActiveMQBuffer bodyCopy = m.getReadOnlyBodyBuffer();
- byte[] bytes = new byte[bodyCopy.readableBytes() <= valueSizeLimit
? bodyCopy.readableBytes() : valueSizeLimit + 1];
- bodyCopy.readBytes(bytes);
- rc.put(CompositeDataConstants.BODY, JsonUtil.truncate(bytes,
valueSizeLimit));
- } else {
- rc.put(CompositeDataConstants.BODY, new byte[0]);
- }
- return rc;
- }
- }
-
- static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
- protected SimpleType text;
-
- @Override
- protected void init() throws OpenDataException {
- super.init();
- addItem(CompositeDataConstants.TEXT_BODY,
CompositeDataConstants.TEXT_BODY, SimpleType.STRING);
- }
-
- @Override
- public Map<String, Object> getFields(MessageReference ref, int
valueSizeLimit) throws OpenDataException {
- Map<String, Object> rc = super.getFields(ref, valueSizeLimit);
- ICoreMessage m = ref.getMessage().toCore();
- if (!m.isLargeMessage()) {
- if (m.containsProperty(Message.HDR_LARGE_COMPRESSED)) {
- rc.put(CompositeDataConstants.TEXT_BODY, "[compressed]");
- } else {
- SimpleString text =
m.getReadOnlyBodyBuffer().readNullableSimpleString();
- rc.put(CompositeDataConstants.TEXT_BODY, text != null ?
JsonUtil.truncate(text.toString(), valueSizeLimit) : "");
- }
- } else {
- rc.put(CompositeDataConstants.TEXT_BODY, "[large message]");
- }
- return rc;
- }
- }
-}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java
index 1839fe5..877237d 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/management/impl/openmbean/OpenTypeSupportTest.java
@@ -20,7 +20,6 @@ package
org.apache.activemq.artemis.core.management.impl.openmbean;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
-import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.reader.TextMessageUtil;
import org.junit.Assert;
import org.junit.Test;
@@ -39,7 +38,7 @@ public class OpenTypeSupportTest {
TextMessageUtil.writeBodyText(coreMessage.getBodyBuffer(),
SimpleString.toSimpleString(bodyText));
- CompositeData cd = OpenTypeSupport.convert(new
MessageReferenceImpl(coreMessage, null), 256);
+ CompositeData cd = coreMessage.toCompositeData(256, 1);
Assert.assertEquals(bodyText, cd.get("text"));
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index d45bcff..66dbcff 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.management;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.json.JsonArray;
import javax.json.JsonObject;
import javax.management.Notification;
@@ -70,10 +76,12 @@ import
org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import
org.apache.activemq.artemis.tests.integration.jms.server.management.JMSUtil;
+import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.RandomUtil;
@@ -86,8 +94,8 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import static
org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.BODY;
-import static
org.apache.activemq.artemis.core.management.impl.openmbean.CompositeDataConstants.STRING_PROPERTIES;
+import static
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.BODY;
+import static
org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants.STRING_PROPERTIES;
@RunWith(value = Parameterized.class)
public class QueueControlTest extends ManagementTestBase {
@@ -3447,6 +3455,123 @@ public class QueueControlTest extends
ManagementTestBase {
Assert.assertEquals(new String(body), "theBody");
}
+
+ @Test
+ public void testSendMessageWithAMQP() throws Exception {
+ SimpleString address = new
SimpleString("address_testSendMessageWithAMQP");
+ SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP");
+
+ server.addAddressInfo(new
AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
+
+ Wait.assertTrue(() -> server.locateQueue(queue) != null &&
server.getAddressInfo(address) != null);
+
+ QueueControl queueControl = createManagementControl(address, queue,
RoutingType.ANYCAST);
+
+ { // a namespace
+ ConnectionFactory factory = CFUtil.createConnectionFactory("amqp",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection("myUser",
"myPassword")) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(address.toString()));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage message = session.createTextMessage("theAMQPBody");
+ message.setStringProperty("protocolUsed", "amqp");
+ producer.send(message);
+ }
+ }
+
+ { // a namespace
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection("myUser",
"myPassword")) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(address.toString()));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage message = session.createTextMessage("theCoreBody");
+ message.setStringProperty("protocolUsed", "core");
+ producer.send(message);
+ }
+ }
+
+ Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100);
+
+ // the message IDs are set on the server
+ CompositeData[] browse = queueControl.browse(null);
+
+ Assert.assertEquals(2, browse.length);
+
+ String body = (String) browse[0].get("text");
+
+ Assert.assertNotNull(body);
+
+ Assert.assertEquals("theAMQPBody", body);
+
+ body = (String) browse[1].get("text");
+
+ Assert.assertNotNull(body);
+
+ Assert.assertEquals("theCoreBody", body);
+
+ }
+
+
+ @Test
+ public void testSendMessageWithAMQPLarge() throws Exception {
+ SimpleString address = new
SimpleString("address_testSendMessageWithAMQP");
+ SimpleString queue = new SimpleString("queue_testSendMessageWithAMQP");
+
+ server.addAddressInfo(new
AddressInfo(address).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+ server.createQueue(new
QueueConfiguration(queue).setAddress(address).setDurable(durable).setRoutingType(RoutingType.ANYCAST));
+
+ Wait.assertTrue(() -> server.locateQueue(queue) != null &&
server.getAddressInfo(address) != null);
+
+ QueueControl queueControl = createManagementControl(address, queue,
RoutingType.ANYCAST);
+
+ StringBuffer bufferLarge = new StringBuffer();
+ for (int i = 0; i < 100 * 1024; i++) {
+ bufferLarge.append("*-");
+ }
+
+ { // a namespace
+ ConnectionFactory factory = CFUtil.createConnectionFactory("amqp",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection("myUser",
"myPassword")) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(address.toString()));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage message =
session.createTextMessage(bufferLarge.toString());
+ message.setStringProperty("protocolUsed", "amqp");
+ producer.send(message);
+ }
+ }
+
+ { // a namespace
+ ConnectionFactory factory = CFUtil.createConnectionFactory("core",
"tcp://localhost:61616");
+ try (Connection connection = factory.createConnection("myUser",
"myPassword")) {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer =
session.createProducer(session.createQueue(address.toString()));
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+ TextMessage message =
session.createTextMessage(bufferLarge.toString());
+ message.setStringProperty("protocolUsed", "core");
+ producer.send(message);
+ }
+ }
+
+ Wait.assertEquals(2L, () -> getMessageCount(queueControl), 2000, 100);
+
+ // the message IDs are set on the server
+ CompositeData[] browse = queueControl.browse(null);
+
+ Assert.assertEquals(2, browse.length);
+
+ String body = (String) browse[0].get("text");
+
+ Assert.assertNotNull(body);
+
+ body = (String) browse[1].get("text");
+
+ Assert.assertNotNull(body);
+
+ }
+
@Test
public void testSendMessageWithMessageId() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
@@ -3763,7 +3888,7 @@ public class QueueControlTest extends ManagementTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
- Configuration conf =
createDefaultInVMConfig().setJMXManagementEnabled(true);
+ Configuration conf =
createDefaultConfig(true).setJMXManagementEnabled(true);
server = addServer(ActiveMQServers.newActiveMQServer(conf, mbeanServer,
true));
server.start();