[ https://issues.apache.org/jira/browse/ARTEMIS-4259?focusedWorklogId=864214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-864214 ]
ASF GitHub Bot logged work on ARTEMIS-4259: ------------------------------------------- Author: ASF GitHub Bot Created on: 07/Jun/23 14:55 Start Date: 07/Jun/23 14:55 Worklog Time Spent: 10m Work Description: clebertsuconic commented on code in PR #4502: URL: https://github.com/apache/activemq-artemis/pull/4502#discussion_r1221742436 ########## tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.multiprotocol; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.CompositeAddress; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Test; + +public class JMSFQQNConsumerTest extends MultiprotocolJMSClientTestSupport { + + @Test + public void testFQQNTopicConsumerWithSelectorAMQP() throws Exception { + testFQQNTopicConsumerWithSelector("AMQP", true); + } + + @Test + public void testFQQNTopicConsumerWithSelectorOpenWire() throws Exception { + testFQQNTopicConsumerWithSelector("OPENWIRE", false); + } + + @Test + public void testFQQNTopicConsumerWithSelectorCore() throws Exception { + testFQQNTopicConsumerWithSelector("CORE", true); + } + + private void testFQQNTopicConsumerWithSelector(String protocol, boolean validateFilterChange) throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + final String queue = "queue"; + final String address = "address"; + final String filter = "prop='value'"; + try (Connection c = factory.createConnection()) { + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + MessageConsumer mc = s.createConsumer(t, filter); + Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue)); + Assert.assertNotNull(serverQueue); + Assert.assertEquals(RoutingType.MULTICAST, serverQueue.getRoutingType()); + Assert.assertNotNull(serverQueue.getFilter()); + assertEquals(filter, server.locateQueue(queue).getFilter().getFilterString().toString()); + + MessageProducer producer = s.createProducer(s.createTopic("address")); + Message message = s.createTextMessage("hello"); + message.setStringProperty("prop", "value"); + producer.send(message); + Assert.assertNotNull(mc.receive(5000)); + message = s.createTextMessage("hello"); + message.setStringProperty("prop", "novalue"); + producer.send(message); + Assert.assertNull(mc.receiveNoWait()); + } + + if (validateFilterChange) { + boolean thrownException = false; + try (Connection c = factory.createConnection()) { + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + MessageConsumer mc = s.createConsumer(t, "shouldThrowException=true"); + } catch (Exception e) { + thrownException = true; + } + Assert.assertTrue(thrownException); + } + } + + @Test + public void testFQQNTopicConsumerNoSelectorAMQP() throws Exception { + testFQQNTopicConsumerNoSelector("AMQP"); + } + + @Test + public void testFQQNTopicConsumerNoSelectorOpenWire() throws Exception { + testFQQNTopicConsumerNoSelector("OPENWIRE"); + } + + @Test + public void testFQQNTopicConsumerNoSelectorCore() throws Exception { + testFQQNTopicConsumerNoSelector("CORE"); + } + + private void testFQQNTopicConsumerNoSelector(String protocol) throws Exception { + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + final String queue = "queue"; + final String address = "address"; + try (Connection c = factory.createConnection()) { + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + MessageConsumer mc = s.createConsumer(t); + Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100); + assertNull(server.locateQueue(queue).getFilter()); + } + } + + @Test + public void testFQQNTopicConsumerDontExistAMQP() throws Exception { + testFQQNTopicConsumerDontExist("AMQP"); + } + + /* this commented out code is just to make a point that this test would not be valid in openwire. + As openwire is proceeding with creating a topic subscription. + Hence there's no need to test this over JMS1.1 with openWire + @Test + public void testFQQNTopicConsumerDontExistOPENWIRE() throws Exception { + testFQQNTopicConsumerDontExist("OPENWIRE"); + } */ + + @Test + public void testFQQNTopicConsumerDontExistCORE() throws Exception { + testFQQNTopicConsumerDontExist("CORE"); + } + + private void testFQQNTopicConsumerDontExist(String protocol) throws Exception { + AddressSettings settings = new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false); + server.getAddressSettingsRepository().clear(); + server.getAddressSettingsRepository().addMatch("#", settings); + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + final String queue = "queue"; + final String address = "address"; + final String filter = "prop='value'"; + boolean thrownException = false; + try (Connection c = factory.createConnection()) { + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, queue)); + MessageConsumer mc = s.createConsumer(t); + } catch (Exception e) { + thrownException = true; + } + + Assert.assertTrue(thrownException); + } + + @Test + public void testFQQNQueueConsumerWithSelectorAMQP() throws Exception { + testFQQNQueueConsumerWithSelector("AMQP"); + } + + @Test + public void testFQQNQueueConsumerWithSelectorOpenWire() throws Exception { + testFQQNQueueConsumerWithSelector("OPENWIRE"); + } + + @Test + public void testFQQNQueueConsumerWithSelectorCore() throws Exception { + testFQQNQueueConsumerWithSelector("CORE"); + } + + private void testFQQNQueueConsumerWithSelector(String protocol) throws Exception { + AddressSettings settings = new AddressSettings().setDefaultQueueRoutingType(RoutingType.ANYCAST).setDefaultAddressRoutingType(RoutingType.ANYCAST); + server.getAddressSettingsRepository().addMatch("#", settings); + final String queue = "myQueue"; + final String address = "address"; + final String filter = "prop='inside'"; + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:5672"); + try (Connection c = factory.createConnection()) { + c.start(); + Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueQuery = CompositeAddress.toFullyQualified(address, queue) + (protocol.equals("OPENWIRE") ? "?selectorAware=true" : ""); + Queue q = s.createQueue(queueQuery); + MessageConsumer mc = s.createConsumer(q, filter); + Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100); + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(SimpleString.toSimpleString(queue)); + Assert.assertEquals(RoutingType.ANYCAST, serverQueue.getRoutingType()); + Assert.assertNull(serverQueue.getFilter()); + MessageProducer p = s.createProducer(q); + Message m = s.createMessage(); + m.setStringProperty("prop", "inside"); + p.send(m); + assertNotNull(mc.receive(1000)); + m = s.createMessage(); + m.setStringProperty("prop", RandomUtil.randomString()); + assertNull(mc.receiveNoWait()); + + serverQueue.getConsumers().forEach(queueConsumer -> { + Assert.assertNotNull(queueConsumer.getFilter()); Review Comment: this test will be only one.. there's another test I added that is validating filters. It is validating the filters by matching the messages. Issue Time Tracking ------------------- Worklog Id: (was: 864214) Time Spent: 3h 20m (was: 3h 10m) > JMS consumer + FQQN + selector not working > ------------------------------------------ > > Key: ARTEMIS-4259 > URL: https://issues.apache.org/jira/browse/ARTEMIS-4259 > Project: ActiveMQ Artemis > Issue Type: Bug > Reporter: Justin Bertram > Assignee: Justin Bertram > Priority: Major > Time Spent: 3h 20m > Remaining Estimate: 0h > > The CORE protocol does not honor consumers with a filter when used on FQQN > topics. > Steps to reproduce: > Start a consumer using CLI: > {noformat} > ./artemis consumer --url tcp://localhost:61616 --destination > topic://topic1::topic1.queue1 --protocol=core --message-count 1 --filter > "foo='bar'" --verbose{noformat} > Send a message without the filter string: > {noformat} > ./artemis producer --url tcp://localhost:61616 --destination > topic://topic1::topic1.queue1 --protocol=core --message-count 1 --message > "some text"{noformat} > You can observe the CORE client consuming the message which does not contain > the filter String: > {noformat} > Connection brokerURL = tcp://localhost:61616 > Consumer:: filter = foo='bar' > Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 wait until 1 messages > are consumed > Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Received some text > JMS Message ID:ID:a00cbea3-dda7-11ed-9c2d-b42e99ea6f5c > Received text sized at 9 > Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Consumed: 1 messages > Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Elapsed time in > second : 8 s > Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Elapsed time in milli > second : 8140 milli seconds > Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Consumed: 1 messages > Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Consumer thread > finished{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)