[ 
https://issues.apache.org/jira/browse/ARTEMIS-4259?focusedWorklogId=864214&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-864214
 ]

ASF GitHub Bot logged work on ARTEMIS-4259:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Jun/23 14:55
            Start Date: 07/Jun/23 14:55
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4502:
URL: https://github.com/apache/activemq-artemis/pull/4502#discussion_r1221742436


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSFQQNConsumerTest.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.CompositeAddress;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JMSFQQNConsumerTest extends MultiprotocolJMSClientTestSupport {
+
+   @Test
+   public void testFQQNTopicConsumerWithSelectorAMQP() throws Exception {
+      testFQQNTopicConsumerWithSelector("AMQP", true);
+   }
+
+   @Test
+   public void testFQQNTopicConsumerWithSelectorOpenWire() throws Exception {
+      testFQQNTopicConsumerWithSelector("OPENWIRE", false);
+   }
+
+   @Test
+   public void testFQQNTopicConsumerWithSelectorCore() throws Exception {
+      testFQQNTopicConsumerWithSelector("CORE", true);
+   }
+
+   private void testFQQNTopicConsumerWithSelector(String protocol, boolean 
validateFilterChange) throws Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+      final String queue = "queue";
+      final String address = "address";
+      final String filter = "prop='value'";
+      try (Connection c = factory.createConnection()) {
+         c.start();
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+         MessageConsumer mc = s.createConsumer(t, filter);
+         Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(SimpleString.toSimpleString(queue));
+         Assert.assertNotNull(serverQueue);
+         Assert.assertEquals(RoutingType.MULTICAST, 
serverQueue.getRoutingType());
+         Assert.assertNotNull(serverQueue.getFilter());
+         assertEquals(filter, 
server.locateQueue(queue).getFilter().getFilterString().toString());
+
+         MessageProducer producer = s.createProducer(s.createTopic("address"));
+         Message message = s.createTextMessage("hello");
+         message.setStringProperty("prop", "value");
+         producer.send(message);
+         Assert.assertNotNull(mc.receive(5000));
+         message = s.createTextMessage("hello");
+         message.setStringProperty("prop", "novalue");
+         producer.send(message);
+         Assert.assertNull(mc.receiveNoWait());
+      }
+
+      if (validateFilterChange) {
+         boolean thrownException = false;
+         try (Connection c = factory.createConnection()) {
+            c.start();
+            Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+            MessageConsumer mc = s.createConsumer(t, 
"shouldThrowException=true");
+         } catch (Exception e) {
+            thrownException = true;
+         }
+         Assert.assertTrue(thrownException);
+      }
+   }
+
+   @Test
+   public void testFQQNTopicConsumerNoSelectorAMQP() throws Exception {
+      testFQQNTopicConsumerNoSelector("AMQP");
+   }
+
+   @Test
+   public void testFQQNTopicConsumerNoSelectorOpenWire() throws Exception {
+      testFQQNTopicConsumerNoSelector("OPENWIRE");
+   }
+
+   @Test
+   public void testFQQNTopicConsumerNoSelectorCore() throws Exception {
+      testFQQNTopicConsumerNoSelector("CORE");
+   }
+
+   private void testFQQNTopicConsumerNoSelector(String protocol) throws 
Exception {
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+      final String queue = "queue";
+      final String address = "address";
+      try (Connection c = factory.createConnection()) {
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+         MessageConsumer mc = s.createConsumer(t);
+         Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
+         assertNull(server.locateQueue(queue).getFilter());
+      }
+   }
+
+   @Test
+   public void testFQQNTopicConsumerDontExistAMQP() throws Exception {
+      testFQQNTopicConsumerDontExist("AMQP");
+   }
+
+   /* this commented out code is just to make a point that this test would not 
be valid in openwire.
+      As openwire is proceeding with creating a topic subscription.
+      Hence there's no need to test this over JMS1.1 with openWire
+   @Test
+   public void testFQQNTopicConsumerDontExistOPENWIRE() throws Exception {
+      testFQQNTopicConsumerDontExist("OPENWIRE");
+   } */
+
+   @Test
+   public void testFQQNTopicConsumerDontExistCORE() throws Exception {
+      testFQQNTopicConsumerDontExist("CORE");
+   }
+
+   private void testFQQNTopicConsumerDontExist(String protocol) throws 
Exception {
+      AddressSettings settings = new 
AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false);
+      server.getAddressSettingsRepository().clear();
+      server.getAddressSettingsRepository().addMatch("#", settings);
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+      final String queue = "queue";
+      final String address = "address";
+      final String filter = "prop='value'";
+      boolean thrownException = false;
+      try (Connection c = factory.createConnection()) {
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic t = s.createTopic(CompositeAddress.toFullyQualified(address, 
queue));
+         MessageConsumer mc = s.createConsumer(t);
+      } catch (Exception e) {
+         thrownException = true;
+      }
+
+      Assert.assertTrue(thrownException);
+   }
+
+   @Test
+   public void testFQQNQueueConsumerWithSelectorAMQP() throws Exception {
+      testFQQNQueueConsumerWithSelector("AMQP");
+   }
+
+   @Test
+   public void testFQQNQueueConsumerWithSelectorOpenWire() throws Exception {
+      testFQQNQueueConsumerWithSelector("OPENWIRE");
+   }
+
+   @Test
+   public void testFQQNQueueConsumerWithSelectorCore() throws Exception {
+      testFQQNQueueConsumerWithSelector("CORE");
+   }
+
+   private void testFQQNQueueConsumerWithSelector(String protocol) throws 
Exception {
+      AddressSettings settings = new 
AddressSettings().setDefaultQueueRoutingType(RoutingType.ANYCAST).setDefaultAddressRoutingType(RoutingType.ANYCAST);
+      server.getAddressSettingsRepository().addMatch("#", settings);
+      final String queue = "myQueue";
+      final String address = "address";
+      final String filter = "prop='inside'";
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:5672");
+      try (Connection c = factory.createConnection()) {
+         c.start();
+         Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         String queueQuery = CompositeAddress.toFullyQualified(address, queue) 
+ (protocol.equals("OPENWIRE") ? "?selectorAware=true" : "");
+         Queue q = s.createQueue(queueQuery);
+         MessageConsumer mc = s.createConsumer(q, filter);
+         Wait.assertTrue(() -> server.locateQueue(queue) != null, 2000, 100);
+         org.apache.activemq.artemis.core.server.Queue serverQueue = 
server.locateQueue(SimpleString.toSimpleString(queue));
+         Assert.assertEquals(RoutingType.ANYCAST, 
serverQueue.getRoutingType());
+         Assert.assertNull(serverQueue.getFilter());
+         MessageProducer p = s.createProducer(q);
+         Message m = s.createMessage();
+         m.setStringProperty("prop", "inside");
+         p.send(m);
+         assertNotNull(mc.receive(1000));
+         m = s.createMessage();
+         m.setStringProperty("prop", RandomUtil.randomString());
+         assertNull(mc.receiveNoWait());
+
+         serverQueue.getConsumers().forEach(queueConsumer -> {
+            Assert.assertNotNull(queueConsumer.getFilter());

Review Comment:
   this test will be only one.. there's another test I added that is validating 
filters.
   
   It is validating the filters by matching the messages.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 864214)
    Time Spent: 3h 20m  (was: 3h 10m)

> JMS consumer + FQQN + selector not working
> ------------------------------------------
>
>                 Key: ARTEMIS-4259
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4259
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Justin Bertram
>            Assignee: Justin Bertram
>            Priority: Major
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>
> The CORE protocol does not honor consumers with a filter when used on FQQN 
> topics.
> Steps to reproduce:
> Start a consumer using CLI:
> {noformat}
> ./artemis consumer --url tcp://localhost:61616 --destination 
> topic://topic1::topic1.queue1 --protocol=core --message-count 1 --filter 
> "foo='bar'" --verbose{noformat}
> Send a message without the filter string:
> {noformat}
> ./artemis producer --url tcp://localhost:61616 --destination 
> topic://topic1::topic1.queue1 --protocol=core --message-count 1 --message 
> "some text"{noformat}
> You can observe the CORE client consuming the message which does not contain 
> the filter String:
> {noformat}
> Connection brokerURL = tcp://localhost:61616
> Consumer:: filter = foo='bar'
> Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 wait until 1 messages 
> are consumed
> Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Received some text
> JMS Message ID:ID:a00cbea3-dda7-11ed-9c2d-b42e99ea6f5c
> Received text sized at 9
> Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Consumed: 1 messages
> Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Elapsed time in 
> second : 8 s
> Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Elapsed time in milli 
> second : 8140 milli seconds
> Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Consumed: 1 messages
> Consumer ActiveMQTopic[topic1::topic1.queue1], thread=0 Consumer thread 
> finished{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to