This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 13df6a8 ARTEMIS-3457 log WARN for OpenWire property conversion problem
13df6a8 is described below
commit 13df6a8fb9522e53d875755be1d217ab3e260d65
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Sep 2 11:23:12 2021 -0500
ARTEMIS-3457 log WARN for OpenWire property conversion problem
While converting a core message to an OpenWire message there may be an
error processing a property value. Currently this results in an
exception and the message is not dispatched to the client. The broker
eventually attempts to redeliver this message resulting in the same
error. Instead of throwing an exception the broker should simply log a
WARN message and skip the property. This will allow clients to receive
the message without the problematic property and the broker will not
have to attempt to redeliver the message again.
---
.../openwire/OpenWireMessageConverter.java | 68 +++++++++++++---------
.../openwire/OpenWireMessageConverterTest.java | 26 +++++++++
.../artemis/core/server/ActiveMQServerLogger.java | 4 ++
3 files changed, 71 insertions(+), 27 deletions(-)
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 3cc1a16..66a8d66 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -47,6 +47,7 @@ import
org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants;
@@ -75,6 +76,8 @@ import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.UTF8Buffer;
+import static
org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP;
+
public final class OpenWireMessageConverter {
private static final SimpleString JMS_TYPE_PROPERTY = new
SimpleString("JMSType");
@@ -529,7 +532,7 @@ public final class OpenWireMessageConverter {
AMQConsumer consumer, UUID
serverNodeUUID) throws IOException {
final ActiveMQMessage amqMsg;
final byte coreType = coreMessage.getType();
- final Boolean compressProp = (Boolean)
coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+ final Boolean compressProp = getObjectProperty(coreMessage,
Boolean.class, AMQ_MSG_COMPRESSED);
final boolean isCompressed = compressProp != null && compressProp;
final byte[] bytes;
final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
@@ -564,7 +567,7 @@ public final class OpenWireMessageConverter {
throw new IllegalStateException("Unknown message type: " +
coreMessage.getType());
}
- final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
+ final String type = getObjectProperty(coreMessage, String.class,
JMS_TYPE_PROPERTY);
if (type != null) {
amqMsg.setJMSType(type);
}
@@ -573,7 +576,7 @@ public final class OpenWireMessageConverter {
amqMsg.setPriority(coreMessage.getPriority());
amqMsg.setTimestamp(coreMessage.getTimestamp());
- Long brokerInTime = (Long)
coreMessage.getObjectProperty(AMQ_MSG_BROKER_IN_TIME);
+ Long brokerInTime = getObjectProperty(coreMessage, Long.class,
AMQ_MSG_BROKER_IN_TIME);
if (brokerInTime == null) {
brokerInTime = 0L;
}
@@ -583,35 +586,35 @@ public final class OpenWireMessageConverter {
//we need check null because messages may come from other clients
//and those amq specific attribute may not be set.
- Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL);
+ Long arrival = getObjectProperty(coreMessage, Long.class,
AMQ_MSG_ARRIVAL);
if (arrival == null) {
//messages from other sources (like core client) may not set this prop
arrival = 0L;
}
amqMsg.setArrival(arrival);
- final Object brokerPath =
coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
- if (brokerPath instanceof SimpleString &&
((SimpleString)brokerPath).length() > 0) {
- setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString());
+ final SimpleString brokerPath = getObjectProperty(coreMessage,
SimpleString.class, AMQ_MSG_BROKER_PATH);
+ if (brokerPath != null && brokerPath.length() > 0) {
+ setAMQMsgBrokerPath(amqMsg, brokerPath.toString());
}
- final Object clusterPath =
coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
- if (clusterPath instanceof SimpleString &&
((SimpleString)clusterPath).length() > 0) {
- setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString());
+ final SimpleString clusterPath = getObjectProperty(coreMessage,
SimpleString.class, AMQ_MSG_CLUSTER);
+ if (clusterPath != null && clusterPath.length() > 0) {
+ setAMQMsgClusterPath(amqMsg, clusterPath.toString());
}
- Integer commandId = (Integer)
coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID);
+ Integer commandId = getObjectProperty(coreMessage, Integer.class,
AMQ_MSG_COMMAND_ID);
if (commandId == null) {
commandId = -1;
}
amqMsg.setCommandId(commandId);
- final SimpleString corrId = (SimpleString)
coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
+ final SimpleString corrId = getObjectProperty(coreMessage,
SimpleString.class, JMS_CORRELATION_ID_PROPERTY);
if (corrId != null) {
amqMsg.setCorrelationId(corrId.toString());
}
- final byte[] dsBytes = (byte[])
coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
+ final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class,
AMQ_MSG_DATASTRUCTURE);
if (dsBytes != null) {
setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
}
@@ -626,7 +629,7 @@ public final class OpenWireMessageConverter {
amqMsg.setGroupSequence(coreMessage.getGroupSequence());
- final byte[] midBytes = coreMessage.getBytesProperty(AMQ_MSG_MESSAGE_ID);
+ final byte[] midBytes = getObjectProperty(coreMessage, byte[].class,
AMQ_MSG_MESSAGE_ID);
final MessageId mid;
if (midBytes != null) {
ByteSequence midSeq = new ByteSequence(midBytes);
@@ -639,45 +642,45 @@ public final class OpenWireMessageConverter {
amqMsg.setMessageId(mid);
- final byte[] origDestBytes = (byte[])
coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
+ final byte[] origDestBytes = getObjectProperty(coreMessage,
byte[].class, AMQ_MSG_ORIG_DESTINATION);
if (origDestBytes != null) {
setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
}
- final byte[] origTxIdBytes = (byte[])
coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
+ final byte[] origTxIdBytes = getObjectProperty(coreMessage,
byte[].class, AMQ_MSG_ORIG_TXID);
if (origTxIdBytes != null) {
setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
}
- final byte[] producerIdBytes = (byte[])
coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
+ final byte[] producerIdBytes = getObjectProperty(coreMessage,
byte[].class, AMQ_MSG_PRODUCER_ID);
if (producerIdBytes != null) {
ProducerId producerId = (ProducerId) marshaller.unmarshal(new
ByteSequence(producerIdBytes));
amqMsg.setProducerId(producerId);
}
- final byte[] marshalledBytes = (byte[])
coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
+ final byte[] marshalledBytes = getObjectProperty(coreMessage,
byte[].class, AMQ_MSG_MARSHALL_PROP);
if (marshalledBytes != null) {
amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
}
amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
- final byte[] replyToBytes = (byte[])
coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
+ final byte[] replyToBytes = getObjectProperty(coreMessage, byte[].class,
AMQ_MSG_REPLY_TO);
if (replyToBytes != null) {
setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
}
- final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
- if (userId instanceof SimpleString && ((SimpleString)userId).length() >
0) {
- amqMsg.setUserID(((SimpleString)userId).toString());
+ final SimpleString userId = getObjectProperty(coreMessage,
SimpleString.class, AMQ_MSG_USER_ID);
+ if (userId != null && userId.length() > 0) {
+ amqMsg.setUserID(userId.toString());
}
- final Boolean isDroppable = (Boolean)
coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
+ final Boolean isDroppable = getObjectProperty(coreMessage,
Boolean.class, AMQ_MSG_DROPPABLE);
if (isDroppable != null) {
amqMsg.setDroppable(isDroppable);
}
- final SimpleString dlqCause = (SimpleString)
coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+ final SimpleString dlqCause = getObjectProperty(coreMessage,
SimpleString.class, AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
if (dlqCause != null) {
setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
}
@@ -687,7 +690,7 @@ public final class OpenWireMessageConverter {
setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
}
- final Long ingressTimestamp =
coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP)
?
coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP)
: null;
+ final Long ingressTimestamp = getObjectProperty(coreMessage, Long.class,
HDR_INGRESS_TIMESTAMP);
if (ingressTimestamp != null) {
setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
}
@@ -704,6 +707,17 @@ public final class OpenWireMessageConverter {
return amqMsg;
}
+ private static <T> T getObjectProperty(ICoreMessage message, Class<T> type,
SimpleString property) {
+ if (message.getPropertyNames().contains(property)) {
+ try {
+ return type.cast(message.getObjectProperty(property));
+ } catch (ClassCastException e) {
+
ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(property,
e.getMessage());
+ }
+ }
+ return null;
+ }
+
private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer,
final boolean isCompressed)
throws IOException {
byte[] bytes = null;
@@ -946,7 +960,7 @@ public final class OpenWireMessageConverter {
private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage
amqMsg,
final Long
ingressTimestamp) throws IOException {
try {
-
amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(),
ingressTimestamp);
+ amqMsg.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(),
ingressTimestamp);
} catch (JMSException e) {
throw new IOException("failure to set ingress timestamp property " +
ingressTimestamp, e);
}
@@ -973,7 +987,7 @@ public final class OpenWireMessageConverter {
amqMsg.setObjectProperty(keyStr, prop);
}
} catch (JMSException e) {
- throw new IOException("exception setting property " + s + " : " +
prop, e);
+ ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(s,
e.getMessage());
}
}
}
diff --git
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
index e1ecfd4..6231eef 100644
---
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
+++
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class OpenWireMessageConverterTest {
@@ -108,4 +109,29 @@ public class OpenWireMessageConverterTest {
assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey)
instanceof String);
}
+
+ @Test
+ public void testBadPropertyConversion() throws Exception {
+ final String hdrArrival = "__HDR_ARRIVAL";
+ final String hdrBrokerInTime = "__HDR_BROKER_IN_TIME";
+ final String hdrCommandId = "__HDR_COMMAND_ID";
+ final String hdrDroppable = "__HDR_DROPPABLE";
+
+ ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+ coreMessage.putStringProperty(hdrArrival, "1234");
+ coreMessage.putStringProperty(hdrBrokerInTime, "5678");
+ coreMessage.putStringProperty(hdrCommandId, "foo");
+ coreMessage.putStringProperty(hdrDroppable, "true");
+
+ MessageReference messageReference = new
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+ AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+
+ MessageDispatch messageDispatch =
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage,
openWireFormat, amqConsumer, nodeUUID);
+
+ assertNull(messageDispatch.getMessage().getProperty(hdrArrival));
+ assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime));
+ assertNull(messageDispatch.getMessage().getProperty(hdrCommandId));
+ assertNull(messageDispatch.getMessage().getProperty(hdrDroppable));
+ }
}
\ No newline at end of file
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index bb72f57..a056c84 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1751,6 +1751,10 @@ public interface ActiveMQServerLogger extends
BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void duplicateAddressSettingMatch(String match);
+ @LogMessage(level = Logger.Level.WARN)
+ @Message(id = 222302, value = "Failed to deal with property {0} when
converting message from core to OpenWire: {1}", format =
Message.Format.MESSAGE_FORMAT)
+ void failedToDealWithObjectProperty(SimpleString property, String
exceptionMessage);
+
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 224000, value = "Failure in initialisation", format =
Message.Format.MESSAGE_FORMAT)
void initializationError(@Cause Throwable e);