[ https://issues.apache.org/jira/browse/ARTEMIS-4657?focusedWorklogId=907349&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-907349 ]
ASF GitHub Bot logged work on ARTEMIS-4657: ------------------------------------------- Author: ASF GitHub Bot Created on: 28/Feb/24 13:42 Start Date: 28/Feb/24 13:42 Worklog Time Spent: 10m Work Description: gemmellr commented on code in PR #4833: URL: https://github.com/apache/activemq-artemis/pull/4833#discussion_r1505615830 ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java: ########## @@ -1562,7 +1562,7 @@ public final Object getObjectProperty(String key) { return getAMQPUserID(); case MessageUtil.CORRELATIONID_HEADER_NAME_STRING: if (properties != null && properties.getCorrelationId() != null) { - return AMQPMessageIdHelper.INSTANCE.toCorrelationIdString(properties.getCorrelationId()); + return AMQPMessageIdHelper.INSTANCE.toCorrelationIdStringOrBytes(properties.getCorrelationId()); Review Comment: So this is actually going to potentially break some exiting usage. Was it decided thats ok? Plus not to offer ability to restore the prior behaviour? ########## artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java: ########## @@ -159,7 +160,9 @@ public static org.apache.activemq.artemis.api.core.Message inbound(final Message coreMessage.putIntProperty(OpenWireConstants.AMQ_MSG_COMMAND_ID, messageSend.getCommandId()); final String corrId = messageSend.getCorrelationId(); if (corrId != null) { - coreMessage.putStringProperty(OpenWireConstants.JMS_CORRELATION_ID_PROPERTY, new SimpleString(corrId)); + // this mimics what the OpenWire JMS client will do when it writes the correlation ID before sending + byte[] bytes = corrId.getBytes(StandardCharsets.UTF_8); + coreMessage.setCorrelationID(bytes); Review Comment: This also seems like it is going to break a bunch of things. Stuff that got a String before, will now get bytes/Binary instead. Even though a String is almost certainly what was sent originally. E.g try sending a String CorrelationID from the OpenWire JMS client and retrieving a String CorrelationID from the AMQP JMS client. Before it would see exactly what the original client sent, as a String. Now it will now return an encoded binary hex since it will actually receive a Binary correlationID instead of a String one? ########## artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/AMQPMessageIdHelper.java: ########## @@ -130,6 +130,8 @@ public String toCorrelationIdString(Object idObject) { // It has "ID:" prefix and doesn't have encoding prefix, use it as-is. return stringId; } + } else if (idObject instanceof Binary) { + return ((Binary)idObject).getArray(); Review Comment: Strictly speaking, its possible the array isnt just the id...the Binary should be checked that it doesnt have an array offset and is the same length as the array. ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSCorrelationIDTest.java: ########## @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.artemis.tests.util.RandomUtil; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test that correlation ID is handled as expected between JMS clients. + */ +public class JMSCorrelationIDTest extends MultiprotocolJMSClientTestSupport { + + private void testCorrelationIDAsBytesSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + byte[] bytes = new byte[0xf + 1]; + for (int i = 0; i <= 0xf; i++) { + bytes[i] = (byte) i; + } + + MessageProducer producer = session.createProducer(queue); + Message message = session.createMessage(); + message.setJMSCorrelationIDAsBytes(bytes); + producer.send(message); + producer.close(); + + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); + + Message m = consumer.receive(5000); + Assert.assertNotNull("Could not receive message on consumer", m); + + Assert.assertArrayEquals(bytes, m.getJMSCorrelationIDAsBytes()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromAMQPToAMQP() throws Throwable { + testCorrelationIDAsBytesSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromAMQPToCore() throws Throwable { + testCorrelationIDAsBytesSendReceive(createConnection(), createCoreConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromAMQPToOpenWire() throws Throwable { + testCorrelationIDAsBytesSendReceive(createConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromCoreToCore() throws Throwable { + testCorrelationIDAsBytesSendReceive(createCoreConnection(), createCoreConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromCoreToAMQP() throws Throwable { + testCorrelationIDAsBytesSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromCoreToOpenWire() throws Throwable { + testCorrelationIDAsBytesSendReceive(createCoreConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromOpenWireToOpenWire() throws Throwable { + testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromOpenWireToAMQP() throws Throwable { + testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsBytesSendReceiveFromOpenWireToCore() throws Throwable { + testCorrelationIDAsBytesSendReceive(createOpenWireConnection(), createCoreConnection()); + } + + private void testCorrelationIDAsStringSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable { + final String correlationId = RandomUtil.randomString(); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + + MessageProducer producer = session.createProducer(queue); + Message message = session.createMessage(); + message.setJMSCorrelationID(correlationId); + producer.send(message); + producer.close(); + + Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerQueue = sessionConsumer.createQueue(getQueueName()); + final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue); + + Message m = consumer.receive(5000); + Assert.assertNotNull("Could not receive message on consumer", m); + + Assert.assertEquals(correlationId, m.getJMSCorrelationID()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromAMQPToAMQP() throws Throwable { + testCorrelationIDAsStringSendReceive(createConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromAMQPToCore() throws Throwable { + testCorrelationIDAsStringSendReceive(createConnection(), createCoreConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromAMQPToOpenWire() throws Throwable { + testCorrelationIDAsStringSendReceive(createConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromCoreToCore() throws Throwable { + testCorrelationIDAsStringSendReceive(createCoreConnection(), createCoreConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromCoreToAMQP() throws Throwable { + testCorrelationIDAsStringSendReceive(createCoreConnection(), createConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromCoreToOpenWire() throws Throwable { + testCorrelationIDAsStringSendReceive(createCoreConnection(), createOpenWireConnection()); + } + + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromOpenWireToOpenWire() throws Throwable { + testCorrelationIDAsStringSendReceive(createOpenWireConnection(), createOpenWireConnection()); + } + + /* + * JMS supports setting the correlation ID as a String or a byte[]. However, OpenWire only supports correlation ID as + * a String. When it is set as a byte[] the OpenWire JMS client just converts it to a UTF-8 encoded String, and + * therefore when it sends a JMS message with a correlation ID the broker can't tell if the value was set as a String + * or a byte[]. Due to this ambiguity the broker is hard-coded to treat the value as a byte[]. This doesn't cause any + * problems if the consumer is also OpenWire, but if the consumer is core or AMQP (which both differentiate between + * String and binary values) then retrieving the correlation ID as a String (i.e. via Message.getJMSCorrelationID()) + * will fail. + * + * JMS means for the correlation ID as a byte[] to be used for "native" clients which makes it a good candidate for + * interoperability between other protocols like MQTT 5 which *only* supports correlation ID as byte[]. + */ + @Ignore + @Test(timeout = 60000) + public void testCorrelationIDAsStringSendReceiveFromOpenWireToAMQP() throws Throwable { + testCorrelationIDAsStringSendReceive(createOpenWireConnection(), createConnection()); + } Review Comment: Seems that you actually hit the interop issues I commented on from looking at the code. Why is the broker 'hard coded to byte[]' for Openwire when this comment explicitly notes it effectively only does String? Why isnt it hard coded to using String...like it was before? If MQTT only supports byte[] then it seem like it is the MQTT stuff that should be jumping through hoops such as converting to/from a UTF-8 bytes, not the Openwire bits, especially as doing it this way breaks the typical+existing Openwire<->AMQP/Core interop. Issue Time Tracking ------------------- Worklog Id: (was: 907349) Time Spent: 0.5h (was: 20m) > Support correlation ID compatibility between JMS clients > -------------------------------------------------------- > > Key: ARTEMIS-4657 > URL: https://issues.apache.org/jira/browse/ARTEMIS-4657 > Project: ActiveMQ Artemis > Issue Type: Improvement > Reporter: Justin Bertram > Assignee: Justin Bertram > Priority: Major > Time Spent: 0.5h > Remaining Estimate: 0h > > Currently there are some use-cases with both {{String}} and {{byte[]}} values > of JMS correlation ID that don't work between Core, OpenWire, and AMQP. We > should support as many as possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)