[ 
https://issues.apache.org/jira/browse/ARTEMIS-4657?focusedWorklogId=907349&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-907349
 ]

ASF GitHub Bot logged work on ARTEMIS-4657:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Feb/24 13:42
            Start Date: 28/Feb/24 13:42
    Worklog Time Spent: 10m 
      Work Description: gemmellr commented on code in PR #4833:
URL: https://github.com/apache/activemq-artemis/pull/4833#discussion_r1505615830


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java:
##########
@@ -1562,7 +1562,7 @@ public final Object getObjectProperty(String key) {
             return getAMQPUserID();
          case MessageUtil.CORRELATIONID_HEADER_NAME_STRING:
             if (properties != null && properties.getCorrelationId() != null) {
-               return 
AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId());
+               return 
AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId());

Review Comment:
   So this is actually going to potentially break some exiting usage. Was it 
decided thats ok? Plus not to offer ability to restore the prior behaviour?



##########
artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java:
##########
@@ -159,7 +160,9 @@ public static org.apache.activemq.artemis.api.core.Message 
inbound(final Message
       coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID, 
messageSend.getCommandId());
       final String corrId = messageSend.getCorrelationId();
       if (corrId != null) {
-         
coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, 
new SimpleString(corrId));
+         // this mimics what the OpenWire JMS client will do when it writes 
the correlation ID before sending
+         byte[] bytes = corrId.getBytes(StandardCharsets.UTF_8);
+         coreMessage.setCorrelationID(bytes);

Review Comment:
   This also seems like it is going to break a bunch of things. Stuff that got 
a String before, will now get bytes/Binary instead. Even though a String is 
almost certainly what was sent originally.
   
   E.g try sending a String CorrelationID from the OpenWire JMS client and 
retrieving a String CorrelationID from the AMQP JMS client. Before it would see 
exactly what the original client sent, as a String. Now it will now return an 
encoded binary hex since it will actually receive a Binary correlationID 
instead of a String one?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java:
##########
@@ -130,6 +130,8 @@ public String toCorrelationIdString(Object idObject) {
             // It has "ID:" prefix and doesn't have encoding prefix, use it 
as-is.
             return stringId;
          }
+      } else if (idObject instanceof Binary) {
+         return ((Binary)idObject).getArray();

Review Comment:
   Strictly speaking, its possible the array isnt just the id...the Binary 
should be checked that it doesnt have an array offset and is the same length as 
the array.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.tests.util.RandomUtil;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test that correlation ID is handled as expected between JMS clients.
+ */
+public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport {
+
+   private void testCorrelationIDAsBytesSendReceive(Connection 
producerConnection, Connection consumerConnection) throws Throwable {
+      Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      byte[] bytes = new byte[0xf + 1];
+      for (int i = 0; i <= 0xf; i++) {
+         bytes[i] = (byte) i;
+      }
+
+      MessageProducer producer = session.createProducer(queue);
+      Message message = session.createMessage();
+      message.setJMSCorrelationIDAsBytes(bytes);
+      producer.send(message);
+      producer.close();
+
+      Session sessionConsumer = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+      final MessageConsumer consumer = 
sessionConsumer.createConsumer(consumerQueue);
+
+      Message m = consumer.receive(5000);
+      Assert.assertNotNull("Could not receive message on consumer", m);
+
+      Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createCoreConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire() 
throws Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws 
Throwable {
+      testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), 
createCoreConnection());
+   }
+
+   private void testCorrelationIDAsStringSendReceive(Connection 
producerConnection, Connection consumerConnection) throws Throwable {
+      final String correlationId = RandomUtil.randomString();
+
+      Session session = producerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue queue = session.createQueue(getQueueName());
+
+      MessageProducer producer = session.createProducer(queue);
+      Message message = session.createMessage();
+      message.setJMSCorrelationID(correlationId);
+      producer.send(message);
+      producer.close();
+
+      Session sessionConsumer = consumerConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
+      final MessageConsumer consumer = 
sessionConsumer.createConsumer(consumerQueue);
+
+      Message m = consumer.receive(5000);
+      Assert.assertNotNull("Could not receive message on consumer", m);
+
+      Assert.assertEquals(correlationId, m.getJMSCorrelationID());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createCoreConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createCoreConnection(), 
createOpenWireConnection());
+   }
+
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire() 
throws Throwable {
+      testCorrelationIDAsStringSendReceive(createOpenWireConnection(), 
createOpenWireConnection());
+   }
+
+   /*
+    * JMS supports setting the correlation ID as a String or a byte[]. 
However, OpenWire only supports correlation ID as
+    * a String. When it is set as a byte[] the OpenWire JMS client just 
converts it to a UTF-8 encoded String, and
+    * therefore when it sends a JMS message with a correlation ID the broker 
can't tell if the value was set as a String
+    * or a byte[]. Due to this ambiguity the broker is hard-coded to treat the 
value as a byte[]. This doesn't cause any
+    * problems if the consumer is also OpenWire, but if the consumer is core 
or AMQP (which both differentiate between
+    * String and binary values) then retrieving the correlation ID as a String 
(i.e. via Message.getJMSCorrelationID())
+    * will fail.
+    *
+    * JMS means for the correlation ID as a byte[] to be used for "native" 
clients which makes it a good candidate for
+    * interoperability between other protocols like MQTT 5 which *only* 
supports correlation ID as byte[].
+    */
+   @Ignore
+   @Test(timeout = 60000)
+   public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws 
Throwable {
+      testCorrelationIDAsStringSendReceive(createOpenWireConnection(), 
createConnection());
+   }

Review Comment:
   Seems that you actually hit the interop issues I commented on from looking 
at the code.
   
   Why is the broker 'hard coded to byte[]' for Openwire when this comment 
explicitly notes it effectively only does String? Why isnt it hard coded to 
using String...like it was before?
   
   If MQTT only supports byte[] then it seem like it is the MQTT stuff that 
should be jumping through hoops such as converting to/from a UTF-8 bytes, not 
the Openwire bits, especially as doing it this way breaks the typical+existing 
Openwire<->AMQP/Core interop.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 907349)
    Time Spent: 0.5h  (was: 20m)

> Support correlation ID compatibility between JMS clients
> --------------------------------------------------------
>
>                 Key: ARTEMIS-4657
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4657
>             Project: ActiveMQ Artemis
>          Issue Type: Improvement
>            Reporter: Justin Bertram
>            Assignee: Justin Bertram
>            Priority: Major
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> Currently there are some use-cases with both {{String}} and {{byte[]}} values 
> of JMS correlation ID that don't work between Core, OpenWire, and AMQP. We 
> should support as many as possible.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to