This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 13df6a8  ARTEMIS-3457 log WARN for OpenWire property conversion problem
13df6a8 is described below

commit 13df6a8fb9522e53d875755be1d217ab3e260d65
Author: Justin Bertram <[email protected]>
AuthorDate: Thu Sep 2 11:23:12 2021 -0500

    ARTEMIS-3457 log WARN for OpenWire property conversion problem
    
    While converting a core message to an OpenWire message there may be an
    error processing a property value. Currently this results in an
    exception and the message is not dispatched to the client. The broker
    eventually attempts to redeliver this message resulting in the same
    error. Instead of throwing an exception the broker should simply log a
    WARN message and skip the property. This will allow clients to receive
    the message without the problematic property and the broker will not
    have to attempt to redeliver the message again.
---
 .../openwire/OpenWireMessageConverter.java         | 68 +++++++++++++---------
 .../openwire/OpenWireMessageConverterTest.java     | 26 +++++++++
 .../artemis/core/server/ActiveMQServerLogger.java  |  4 ++
 3 files changed, 71 insertions(+), 27 deletions(-)

diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 3cc1a16..66a8d66 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -47,6 +47,7 @@ import 
org.apache.activemq.artemis.core.message.impl.CoreMessage;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.DataConstants;
@@ -75,6 +76,8 @@ import org.apache.activemq.util.MarshallingSupport;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtbuf.UTF8Buffer;
 
+import static 
org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP;
+
 public final class OpenWireMessageConverter {
 
    private static final SimpleString JMS_TYPE_PROPERTY = new 
SimpleString("JMSType");
@@ -529,7 +532,7 @@ public final class OpenWireMessageConverter {
                                                AMQConsumer consumer, UUID 
serverNodeUUID) throws IOException {
       final ActiveMQMessage amqMsg;
       final byte coreType = coreMessage.getType();
-      final Boolean compressProp = (Boolean) 
coreMessage.getObjectProperty(AMQ_MSG_COMPRESSED);
+      final Boolean compressProp = getObjectProperty(coreMessage, 
Boolean.class, AMQ_MSG_COMPRESSED);
       final boolean isCompressed = compressProp != null && compressProp;
       final byte[] bytes;
       final ActiveMQBuffer buffer = coreMessage.getDataBuffer();
@@ -564,7 +567,7 @@ public final class OpenWireMessageConverter {
             throw new IllegalStateException("Unknown message type: " + 
coreMessage.getType());
       }
 
-      final String type = coreMessage.getStringProperty(JMS_TYPE_PROPERTY);
+      final String type = getObjectProperty(coreMessage, String.class, 
JMS_TYPE_PROPERTY);
       if (type != null) {
          amqMsg.setJMSType(type);
       }
@@ -573,7 +576,7 @@ public final class OpenWireMessageConverter {
       amqMsg.setPriority(coreMessage.getPriority());
       amqMsg.setTimestamp(coreMessage.getTimestamp());
 
-      Long brokerInTime = (Long) 
coreMessage.getObjectProperty(AMQ_MSG_BROKER_IN_TIME);
+      Long brokerInTime = getObjectProperty(coreMessage, Long.class, 
AMQ_MSG_BROKER_IN_TIME);
       if (brokerInTime == null) {
          brokerInTime = 0L;
       }
@@ -583,35 +586,35 @@ public final class OpenWireMessageConverter {
 
       //we need check null because messages may come from other clients
       //and those amq specific attribute may not be set.
-      Long arrival = (Long) coreMessage.getObjectProperty(AMQ_MSG_ARRIVAL);
+      Long arrival = getObjectProperty(coreMessage, Long.class, 
AMQ_MSG_ARRIVAL);
       if (arrival == null) {
          //messages from other sources (like core client) may not set this prop
          arrival = 0L;
       }
       amqMsg.setArrival(arrival);
 
-      final Object brokerPath = 
coreMessage.getObjectProperty(AMQ_MSG_BROKER_PATH);
-      if (brokerPath instanceof SimpleString && 
((SimpleString)brokerPath).length() > 0) {
-         setAMQMsgBrokerPath(amqMsg, ((SimpleString)brokerPath).toString());
+      final SimpleString brokerPath = getObjectProperty(coreMessage, 
SimpleString.class, AMQ_MSG_BROKER_PATH);
+      if (brokerPath != null && brokerPath.length() > 0) {
+         setAMQMsgBrokerPath(amqMsg, brokerPath.toString());
       }
 
-      final Object clusterPath = 
coreMessage.getObjectProperty(AMQ_MSG_CLUSTER);
-      if (clusterPath instanceof SimpleString && 
((SimpleString)clusterPath).length() > 0) {
-         setAMQMsgClusterPath(amqMsg, ((SimpleString)clusterPath).toString());
+      final SimpleString clusterPath = getObjectProperty(coreMessage, 
SimpleString.class, AMQ_MSG_CLUSTER);
+      if (clusterPath != null && clusterPath.length() > 0) {
+         setAMQMsgClusterPath(amqMsg, clusterPath.toString());
       }
 
-      Integer commandId = (Integer) 
coreMessage.getObjectProperty(AMQ_MSG_COMMAND_ID);
+      Integer commandId = getObjectProperty(coreMessage, Integer.class, 
AMQ_MSG_COMMAND_ID);
       if (commandId == null) {
          commandId = -1;
       }
       amqMsg.setCommandId(commandId);
 
-      final SimpleString corrId = (SimpleString) 
coreMessage.getObjectProperty(JMS_CORRELATION_ID_PROPERTY);
+      final SimpleString corrId = getObjectProperty(coreMessage, 
SimpleString.class, JMS_CORRELATION_ID_PROPERTY);
       if (corrId != null) {
          amqMsg.setCorrelationId(corrId.toString());
       }
 
-      final byte[] dsBytes = (byte[]) 
coreMessage.getObjectProperty(AMQ_MSG_DATASTRUCTURE);
+      final byte[] dsBytes = getObjectProperty(coreMessage, byte[].class, 
AMQ_MSG_DATASTRUCTURE);
       if (dsBytes != null) {
          setAMQMsgDataStructure(amqMsg, marshaller, dsBytes);
       }
@@ -626,7 +629,7 @@ public final class OpenWireMessageConverter {
 
       amqMsg.setGroupSequence(coreMessage.getGroupSequence());
 
-      final byte[] midBytes = coreMessage.getBytesProperty(AMQ_MSG_MESSAGE_ID);
+      final byte[] midBytes = getObjectProperty(coreMessage, byte[].class, 
AMQ_MSG_MESSAGE_ID);
       final MessageId mid;
       if (midBytes != null) {
          ByteSequence midSeq = new ByteSequence(midBytes);
@@ -639,45 +642,45 @@ public final class OpenWireMessageConverter {
 
       amqMsg.setMessageId(mid);
 
-      final byte[] origDestBytes = (byte[]) 
coreMessage.getObjectProperty(AMQ_MSG_ORIG_DESTINATION);
+      final byte[] origDestBytes = getObjectProperty(coreMessage, 
byte[].class, AMQ_MSG_ORIG_DESTINATION);
       if (origDestBytes != null) {
          setAMQMsgOriginalDestination(amqMsg, marshaller, origDestBytes);
       }
 
-      final byte[] origTxIdBytes = (byte[]) 
coreMessage.getObjectProperty(AMQ_MSG_ORIG_TXID);
+      final byte[] origTxIdBytes = getObjectProperty(coreMessage, 
byte[].class, AMQ_MSG_ORIG_TXID);
       if (origTxIdBytes != null) {
          setAMQMsgOriginalTransactionId(amqMsg, marshaller, origTxIdBytes);
       }
 
-      final byte[] producerIdBytes = (byte[]) 
coreMessage.getObjectProperty(AMQ_MSG_PRODUCER_ID);
+      final byte[] producerIdBytes = getObjectProperty(coreMessage, 
byte[].class, AMQ_MSG_PRODUCER_ID);
       if (producerIdBytes != null) {
          ProducerId producerId = (ProducerId) marshaller.unmarshal(new 
ByteSequence(producerIdBytes));
          amqMsg.setProducerId(producerId);
       }
 
-      final byte[] marshalledBytes = (byte[]) 
coreMessage.getObjectProperty(AMQ_MSG_MARSHALL_PROP);
+      final byte[] marshalledBytes = getObjectProperty(coreMessage, 
byte[].class, AMQ_MSG_MARSHALL_PROP);
       if (marshalledBytes != null) {
          amqMsg.setMarshalledProperties(new ByteSequence(marshalledBytes));
       }
 
       amqMsg.setRedeliveryCounter(reference.getDeliveryCount() - 1);
 
-      final byte[] replyToBytes = (byte[]) 
coreMessage.getObjectProperty(AMQ_MSG_REPLY_TO);
+      final byte[] replyToBytes = getObjectProperty(coreMessage, byte[].class, 
AMQ_MSG_REPLY_TO);
       if (replyToBytes != null) {
          setAMQMsgReplyTo(amqMsg, marshaller, replyToBytes);
       }
 
-      final Object userId = coreMessage.getObjectProperty(AMQ_MSG_USER_ID);
-      if (userId instanceof SimpleString && ((SimpleString)userId).length() > 
0) {
-         amqMsg.setUserID(((SimpleString)userId).toString());
+      final SimpleString userId = getObjectProperty(coreMessage, 
SimpleString.class, AMQ_MSG_USER_ID);
+      if (userId != null && userId.length() > 0) {
+         amqMsg.setUserID(userId.toString());
       }
 
-      final Boolean isDroppable = (Boolean) 
coreMessage.getObjectProperty(AMQ_MSG_DROPPABLE);
+      final Boolean isDroppable = getObjectProperty(coreMessage, 
Boolean.class, AMQ_MSG_DROPPABLE);
       if (isDroppable != null) {
          amqMsg.setDroppable(isDroppable);
       }
 
-      final SimpleString dlqCause = (SimpleString) 
coreMessage.getObjectProperty(AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
+      final SimpleString dlqCause = getObjectProperty(coreMessage, 
SimpleString.class, AMQ_MSG_DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY);
       if (dlqCause != null) {
          setAMQMsgDlqDeliveryFailureCause(amqMsg, dlqCause);
       }
@@ -687,7 +690,7 @@ public final class OpenWireMessageConverter {
          setAMQMsgHdrLastValueName(amqMsg, lastValueProperty);
       }
 
-      final Long ingressTimestamp = 
coreMessage.getPropertyNames().contains(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP)
 ? 
coreMessage.getLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP)
 : null;
+      final Long ingressTimestamp = getObjectProperty(coreMessage, Long.class, 
HDR_INGRESS_TIMESTAMP);
       if (ingressTimestamp != null) {
          setAMQMsgHdrIngressTimestamp(amqMsg, ingressTimestamp);
       }
@@ -704,6 +707,17 @@ public final class OpenWireMessageConverter {
       return amqMsg;
    }
 
+   private static <T> T getObjectProperty(ICoreMessage message, Class<T> type, 
SimpleString property) {
+      if (message.getPropertyNames().contains(property)) {
+         try {
+            return type.cast(message.getObjectProperty(property));
+         } catch (ClassCastException e) {
+            
ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(property, 
e.getMessage());
+         }
+      }
+      return null;
+   }
+
    private static byte[] toAMQMessageTextType(final ActiveMQBuffer buffer,
                                               final boolean isCompressed) 
throws IOException {
       byte[] bytes = null;
@@ -946,7 +960,7 @@ public final class OpenWireMessageConverter {
    private static void setAMQMsgHdrIngressTimestamp(final ActiveMQMessage 
amqMsg,
                                                     final Long 
ingressTimestamp) throws IOException {
       try {
-         
amqMsg.setLongProperty(org.apache.activemq.artemis.api.core.Message.HDR_INGRESS_TIMESTAMP.toString(),
 ingressTimestamp);
+         amqMsg.setLongProperty(HDR_INGRESS_TIMESTAMP.toString(), 
ingressTimestamp);
       } catch (JMSException e) {
          throw new IOException("failure to set ingress timestamp property " + 
ingressTimestamp, e);
       }
@@ -973,7 +987,7 @@ public final class OpenWireMessageConverter {
                amqMsg.setObjectProperty(keyStr, prop);
             }
          } catch (JMSException e) {
-            throw new IOException("exception setting property " + s + " : " + 
prop, e);
+            ActiveMQServerLogger.LOGGER.failedToDealWithObjectProperty(s, 
e.getMessage());
          }
       }
    }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
index e1ecfd4..6231eef 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverterTest.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class OpenWireMessageConverterTest {
@@ -108,4 +109,29 @@ public class OpenWireMessageConverterTest {
 
       assertTrue(messageDispatch.getMessage().getProperty(bytesPropertyKey) 
instanceof String);
    }
+
+   @Test
+   public void testBadPropertyConversion() throws Exception {
+      final String hdrArrival = "__HDR_ARRIVAL";
+      final String hdrBrokerInTime = "__HDR_BROKER_IN_TIME";
+      final String hdrCommandId = "__HDR_COMMAND_ID";
+      final String hdrDroppable = "__HDR_DROPPABLE";
+
+      ICoreMessage coreMessage = new CoreMessage().initBuffer(8);
+      coreMessage.putStringProperty(hdrArrival, "1234");
+      coreMessage.putStringProperty(hdrBrokerInTime, "5678");
+      coreMessage.putStringProperty(hdrCommandId, "foo");
+      coreMessage.putStringProperty(hdrDroppable, "true");
+
+      MessageReference messageReference = new 
MessageReferenceImpl(coreMessage, Mockito.mock(Queue.class));
+      AMQConsumer amqConsumer = Mockito.mock(AMQConsumer.class);
+      
Mockito.when(amqConsumer.getOpenwireDestination()).thenReturn(destination);
+
+      MessageDispatch messageDispatch = 
OpenWireMessageConverter.createMessageDispatch(messageReference, coreMessage, 
openWireFormat, amqConsumer, nodeUUID);
+
+      assertNull(messageDispatch.getMessage().getProperty(hdrArrival));
+      assertNull(messageDispatch.getMessage().getProperty(hdrBrokerInTime));
+      assertNull(messageDispatch.getMessage().getProperty(hdrCommandId));
+      assertNull(messageDispatch.getMessage().getProperty(hdrDroppable));
+   }
 }
\ No newline at end of file
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index bb72f57..a056c84 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1751,6 +1751,10 @@ public interface ActiveMQServerLogger extends 
BasicLogger {
       format = Message.Format.MESSAGE_FORMAT)
    void duplicateAddressSettingMatch(String match);
 
+   @LogMessage(level = Logger.Level.WARN)
+   @Message(id = 222302, value = "Failed to deal with property {0} when 
converting message from core to OpenWire: {1}", format = 
Message.Format.MESSAGE_FORMAT)
+   void failedToDealWithObjectProperty(SimpleString property, String 
exceptionMessage);
+
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 224000, value = "Failure in initialisation", format = 
Message.Format.MESSAGE_FORMAT)
    void initializationError(@Cause Throwable e);

Reply via email to