This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 4daefbf138 ARTEMIS-4792 Allow consumer priority to be added to 
receiver link address
4daefbf138 is described below

commit 4daefbf138bf920ca72e5f72fee442b676875e36
Author: Timothy Bish <[email protected]>
AuthorDate: Tue Jun 4 10:53:44 2024 -0400

    ARTEMIS-4792 Allow consumer priority to be added to receiver link address
    
    Allow the Source address to provide consumer priority on the address using 
the
    same option value as a core consumer '?consumer-priority=X'. The change 
parses
    any query string appended to an address and uses the address portion as the
    actual receiver address and currently only looks at consumer priority 
values in
    the extracted address query parameters and ignores any other options found. 
The
    existing consumer priority taken from link properties takes precedence over 
the
    value placed on the address query options if both are present.
---
 .../artemis/api/core/ParameterisedAddress.java     |  43 +++++++
 .../artemis/api/core/ParameterisedAddressTest.java |  51 ++++++++
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  74 ++++++++---
 .../artemis/protocol/amqp/proton/AmqpSupport.java  |  37 ++++++
 .../amqp/proton/DefaultSenderController.java       |  60 +++++++--
 .../protocol/amqp/proton/AmqpSupportTest.java      |  22 ++++
 .../integration/amqp/AmqpReceiverPriorityTest.java | 140 ++++++++++++++++++++-
 7 files changed, 395 insertions(+), 32 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java
index 4d25e70db3..a8af6d4564 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ParameterisedAddress.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.api.core;
 import static 
org.apache.activemq.artemis.utils.uri.URISupport.appendParameters;
 import static org.apache.activemq.artemis.utils.uri.URISupport.parseQuery;
 
+import java.util.Collections;
 import java.util.Map;
 
 import org.apache.activemq.artemis.utils.uri.URISupport;
@@ -106,4 +107,46 @@ public class ParameterisedAddress {
       return URISupport.containsQuery(address);
    }
 
+   public static SimpleString extractAddress(SimpleString address) {
+      return SimpleString.toSimpleString(extractAddress(address.toString()));
+   }
+
+   /**
+    * Given an address string, extract only the query portion if the address is
+    * parameterized, otherwise return an empty {@link Map}.
+    *
+    * @param address
+    *       The address to operate on.
+    *
+    * @return a {@link Map} containing the parameters associated with the 
given address.
+    */
+   @SuppressWarnings("unchecked")
+   public static Map<String, String> extractParameters(String address) {
+      final int index = address != null ? address.indexOf('?') : -1;
+
+      if (index == -1) {
+         return Collections.EMPTY_MAP;
+      } else {
+         return parseQuery(address);
+      }
+   }
+
+   /**
+    * Given an address string, extract only the address portion if the address 
is
+    * parameterized, otherwise just return the provided address.
+    *
+    * @param address
+    *       The address to operate on.
+    *
+    * @return the original address minus any appended parameters.
+    */
+   public static String extractAddress(String address) {
+      final int index = address != null ? address.indexOf('?') : -1;
+
+      if (index == -1) {
+         return address;
+      } else {
+         return address.substring(0, index);
+      }
+   }
 }
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/ParameterisedAddressTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/ParameterisedAddressTest.java
new file mode 100644
index 0000000000..2c009eecba
--- /dev/null
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/api/core/ParameterisedAddressTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.api.core;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.junit.jupiter.api.Test;
+
+public class ParameterisedAddressTest {
+
+   @Test
+   public void testExtractParameters() {
+      assertEquals(Collections.EMPTY_MAP, 
ParameterisedAddress.extractParameters(null));
+      assertEquals(Collections.EMPTY_MAP, 
ParameterisedAddress.extractParameters("noParams"));
+      assertNotEquals(Collections.EMPTY_MAP, 
ParameterisedAddress.extractParameters("noParams?param=X"));
+
+      final Map<String, String> params = 
ParameterisedAddress.extractParameters("noParams?param1=X&param2=Y");
+
+      assertEquals(2, params.size());
+
+      assertEquals("X", params.get("param1"));
+      assertEquals("Y", params.get("param2"));
+   }
+
+   @Test
+   public void testExtractAddress() {
+      assertNull(ParameterisedAddress.extractAddress((String) null));
+      assertEquals("noParams", 
ParameterisedAddress.extractAddress("noParams"));
+      assertEquals("noParams", 
ParameterisedAddress.extractAddress("noParams?param=X"));
+   }
+}
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 4612f7873b..a274bac726 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -16,8 +16,9 @@
  */
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.getReceiverPriority;
+
 import java.lang.invoke.MethodHandles;
-import java.util.Map;
 import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
@@ -83,8 +84,6 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   private static final Symbol PRIORITY = Symbol.getSymbol("priority");
-
    protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0);
 
    private final AMQPConnectionCallback protonSPI;
@@ -231,31 +230,68 @@ public class AMQPSessionCallback implements 
SessionCallback {
 
    }
 
-   public Object createSender(ProtonServerSenderContext protonSender,
-                              SimpleString queue,
-                              String filter,
-                              boolean browserOnly) throws Exception {
-      long consumerID = consumerIDGenerator.generateID();
-
-      filter = SelectorTranslator.convertToActiveMQFilterString(filter);
-
-      int priority = 
getPriority(protonSender.getSender().getRemoteProperties());
+   /**
+    * Creates a server consume that reads from the given named queue and 
forwards the read messages to
+    * the AMQP sender to dispatch to the remote peer. The consumer priority 
value is extracted from the
+    * remote link properties that were assigned by the remote receiver.
+    *
+    * @param protonSender
+    *    The {@link ProtonServerReceiverContext} that will be attached to the 
resulting consumer
+    * @param queue
+    *    The target queue that the consumer reads from.
+    * @param filter
+    *    The filter assigned to the consumer of the target queue.
+    * @param browserOnly
+    *    Should the consumer act as a browser on the target queue.
+    *
+    * @return a new {@link ServerConsumer} attached to the given queue.
+    *
+    * @throws Exception if an error occurs while creating the consumer 
instance.
+    */
+   public ServerConsumer createSender(ProtonServerSenderContext protonSender,
+                                      SimpleString queue,
+                                      String filter,
+                                      boolean browserOnly) throws Exception {
+      return createSender(protonSender, queue, filter, browserOnly, 
getReceiverPriority(protonSender.getSender().getRemoteProperties()));
+   }
 
-      ServerConsumer consumer = serverSession.createConsumer(consumerID, 
queue, SimpleString.toSimpleString(filter), priority, browserOnly, false, null);
+   /**
+    * Creates a server consume that reads from the given named queue and 
forwards the read messages to
+    * the AMQP sender to dispatch to the remote peer.
+    *
+    * @param protonSender
+    *    The {@link ProtonServerReceiverContext} that will be attached to the 
resulting consumer
+    * @param queue
+    *    The target queue that the consumer reads from.
+    * @param filter
+    *    The filter assigned to the consumer of the target queue.
+    * @param browserOnly
+    *    Should the consumer act as a browser on the target queue.
+    * @param priority
+    *    The priority to assign the new consumer (server defaults are used if 
not set).
+    *
+    * @return a new {@link ServerConsumer} attached to the given queue.
+    *
+    * @throws Exception if an error occurs while creating the consumer 
instance.
+    */
+   public ServerConsumer createSender(ProtonServerSenderContext protonSender,
+                                      SimpleString queue,
+                                      String filter,
+                                      boolean browserOnly,
+                                      Number priority) throws Exception {
+      final long consumerID = consumerIDGenerator.generateID();
+      final SimpleString filterString = 
SimpleString.toSimpleString(SelectorTranslator.convertToActiveMQFilterString(filter));
+      final int consumerPriority = priority != null ? priority.intValue() : 
ActiveMQDefaultConfiguration.getDefaultConsumerPriority();
+      final ServerConsumer consumer = serverSession.createConsumer(
+         consumerID, queue, filterString, consumerPriority, browserOnly, 
false, null);
 
       // AMQP handles its own flow control for when it's started
       consumer.setStarted(true);
-
       consumer.setProtocolContext(protonSender);
 
       return consumer;
    }
 
-   private int getPriority(Map<Symbol, Object> properties) {
-      Number value = properties == null ? null : (Number) 
properties.get(PRIORITY);
-      return value == null ? 
ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : value.intValue();
-   }
-
    public void startSender(Object brokerConsumer) throws Exception {
       ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer;
       // flow control is done at proton
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index 7cd7802f1f..c75bf0d71b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -115,6 +115,43 @@ public class AmqpSupport {
     */
    public static final String TUNNEL_CORE_MESSAGES = "tunnel-core-messages";
 
+   /**
+    * A priority value added to a remote receiver link attach that indicates 
the desired priority
+    * for the receiver created for the link.
+    */
+   public static final Symbol RECEIVER_PRIORITY = Symbol.getSymbol("priority");
+
+   /**
+    * Check the set of remote properties sent on link attach for any values 
that
+    * are treated as indicating priority and return the match if any. If no 
priority
+    * is indicated in the link properties this method returns null.
+    *
+    * @param remoteProperties
+    *       The {@link Map} of remote properties sent on the remote attach.
+    *
+    * @return a {@link Number} indicating the desired link priority or null if 
none.
+    */
+   public static Number getReceiverPriority(Map<Symbol, Object> 
remoteProperties) {
+      return getReceiverPriority(remoteProperties, null);
+   }
+
+   /**
+    * Check the set of remote properties sent on link attach for any values 
that
+    * are treated as indicating priority and return the match if any. If no 
priority
+    * is indicated in the link properties this method returns null.
+    *
+    * @param remoteProperties
+    *       The {@link Map} of remote properties sent on the remote attach.
+    * @param defaultPriority
+    *       The default value that should be returned if no remote priority 
indicated.
+    *
+    * @return a {@link Number} indicating the desired link priority or null if 
none.
+    */
+   public static Number getReceiverPriority(Map<Symbol, Object> 
remoteProperties, Number defaultPriority) {
+      final Number remotePriority = remoteProperties != null ? (Number) 
remoteProperties.get(RECEIVER_PRIORITY) : null;
+      return remotePriority != null ? remotePriority : defaultPriority;
+   }
+
    /**
     * Search for a given Symbol in a given array of Symbol object.
     *
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.java
index d33031c52a..21708b4e1c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/DefaultSenderController.java
@@ -23,10 +23,12 @@ import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.QUEUE
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SHARED;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.TOPIC_CAPABILITY;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.createQueueName;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.getReceiverPriority;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifyDesiredCapability;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.verifySourceCapability;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -37,6 +39,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
+import org.apache.activemq.artemis.api.core.ParameterisedAddress;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
@@ -115,6 +119,7 @@ public class DefaultSenderController implements 
SenderController {
       this.standardMessageWriter = new AMQPMessageWriter(senderContext);
       this.largeMessageWriter = new AMQPLargeMessageWriter(senderContext);
 
+      Map<String, String> addressParameters = Collections.EMPTY_MAP;
       Source source = (Source) protonSender.getRemoteSource();
       final Map<Symbol, Object> supportedFilters = new HashMap<>();
 
@@ -207,6 +212,10 @@ public class DefaultSenderController implements 
SenderController {
          }
          source.setAddress(queue.toString());
       } else {
+         final String sourceAddress = 
ParameterisedAddress.extractAddress(source.getAddress());
+
+         addressParameters = 
ParameterisedAddress.extractParameters(source.getAddress());
+
          SimpleString addressToUse;
          SimpleString queueNameToUse = null;
          shared = verifySourceCapability(source, SHARED);
@@ -215,14 +224,15 @@ public class DefaultSenderController implements 
SenderController {
          final boolean isFQQN;
 
          //find out if we have an address made up of the address and queue 
name, if yes then set queue name
-         if (CompositeAddress.isFullyQualified(source.getAddress())) {
+         if (CompositeAddress.isFullyQualified(sourceAddress)) {
             isFQQN = true;
-            addressToUse = 
SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress()));
-            queueNameToUse = 
SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress()));
+            addressToUse = 
SimpleString.toSimpleString(CompositeAddress.extractAddressName(sourceAddress));
+            queueNameToUse = 
SimpleString.toSimpleString(CompositeAddress.extractQueueName(sourceAddress));
          } else {
             isFQQN = false;
-            addressToUse = SimpleString.toSimpleString(source.getAddress());
+            addressToUse = SimpleString.toSimpleString(sourceAddress);
          }
+
          //check to see if the client has defined how we act
          boolean clientDefined = verifySourceCapability(source, 
TOPIC_CAPABILITY) || verifySourceCapability(source, QUEUE_CAPABILITY);
          if (clientDefined) {
@@ -409,16 +419,40 @@ public class DefaultSenderController implements 
SenderController {
       // have not honored what it asked for.
       source.setFilter(supportedFilters.isEmpty() ? null : supportedFilters);
 
-      boolean browseOnly = !multicast && source.getDistributionMode() != null 
&& source.getDistributionMode().equals(COPY);
+      final boolean browseOnly = !multicast && source.getDistributionMode() != 
null && source.getDistributionMode().equals(COPY);
+      final Number consumerPriority = 
getReceiverPriority(protonSender.getRemoteProperties(), 
extractConsumerPriority(addressParameters));
 
-      return (Consumer) sessionSPI.createSender(senderContext, queue, 
multicast ? null : selector, browseOnly);
+      // Any new parameters used should be extracted from the values parsed 
from the address to avoid this log message.
+      if (!addressParameters.isEmpty()) {
+         final String unusedParametersMessage = ""
+            + " Not all specified address options were applicable to the 
created server consumer."
+            + " Check the options are spelled correctly."
+            + " Unused parameters=[" + addressParameters + "].";
+
+         logger.debug(unusedParametersMessage);
+      }
+
+      return sessionSPI.createSender(senderContext, queue, multicast ? null : 
selector, browseOnly, consumerPriority);
+   }
+
+   private static Number extractConsumerPriority(Map<String, String> 
addressParameters) {
+      if (addressParameters != null && !addressParameters.isEmpty() ) {
+         final String priorityString = 
addressParameters.remove(QueueConfiguration.CONSUMER_PRIORITY);
+         if (priorityString != null) {
+            return Integer.valueOf(priorityString);
+         }
+      }
+
+      return null;
    }
 
    @Override
    public void close() throws Exception {
       Source source = (Source) protonSender.getSource();
-      if (source != null && source.getAddress() != null && multicast) {
-         SimpleString queueName = 
SimpleString.toSimpleString(source.getAddress());
+      String sourceAddress = getSourceAddress(source);
+
+      if (source != null && sourceAddress != null && multicast) {
+         SimpleString queueName = SimpleString.toSimpleString(sourceAddress);
          QueueQueryResult result = sessionSPI.queueQuery(queueName, 
routingTypeToUse, false);
          if (result.isExists() && source.getDynamic()) {
             sessionSPI.deleteQueue(queueName);
@@ -440,7 +474,7 @@ public class DefaultSenderController implements 
SenderController {
          }
       } else if (source != null && source.getDynamic() && 
(source.getExpiryPolicy() == TerminusExpiryPolicy.LINK_DETACH || 
source.getExpiryPolicy() == TerminusExpiryPolicy.SESSION_END)) {
          try {
-            
sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(source.getAddress()));
+            
sessionSPI.removeTemporaryQueue(SimpleString.toSimpleString(sourceAddress));
          } catch (Exception e) {
             // Ignore on close, its temporary anyway and will be removed later
          }
@@ -480,6 +514,14 @@ public class DefaultSenderController implements 
SenderController {
       return null;
    }
 
+   private static String getSourceAddress(Source source) {
+      if (source != null && source.getAddress() != null) {
+         return ParameterisedAddress.extractAddress(source.getAddress());
+      } else {
+         return null;
+      }
+   }
+
    private void validateConnectionState() throws ActiveMQException {
       final ProtonHandler handler = connection == null ? null : 
connection.getHandler();
       final Connection qpidConnection = handler == null ? null : 
handler.getConnection();
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupportTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupportTest.java
index 7246648a60..02a6f91b34 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupportTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupportTest.java
@@ -17,10 +17,16 @@
 
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RECEIVER_PRIORITY;
+import static 
org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.getReceiverPriority;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.Map;
+
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
@@ -152,4 +158,20 @@ public class AmqpSupportTest {
 
       assertFalse(AmqpSupport.verifyTargetCapability(target, A));
    }
+
+   @Test
+   public void testGetReceiverPriority() {
+      final Map<Symbol, Object> priorityPresent = Map.of(RECEIVER_PRIORITY, 
10);
+      final Map<Symbol, Object> priorityNotPresent = 
Map.of(Symbol.valueOf("test"), 10);
+      final Map<Symbol, Object> priorityPresentButNotNumeric = 
Map.of(RECEIVER_PRIORITY, "test");
+
+      assertEquals(10, getReceiverPriority(priorityPresent));
+      assertEquals(10, getReceiverPriority(priorityPresent, 20));
+
+      assertNull(getReceiverPriority(priorityNotPresent));
+      assertEquals(10, getReceiverPriority(priorityNotPresent, 10));
+      assertEquals(10, getReceiverPriority(null, 10));
+
+      assertThrows(ClassCastException.class, () -> 
getReceiverPriority(priorityPresentButNotNumeric));
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
index 0308976de7..86f9e59f87 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpReceiverPriorityTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
 
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -42,7 +43,6 @@ public class AmqpReceiverPriorityTest extends 
AmqpClientTestSupport {
    @Test
    @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
    public void testPriority() throws Exception {
-
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
@@ -64,6 +64,121 @@ public class AmqpReceiverPriorityTest extends 
AmqpClientTestSupport {
 
       sendMessages(getQueueName(), 5);
 
+      for (int i = 0; i < 5; i++) {
+         AmqpMessage message1 = receiver1.receiveNoWait();
+         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
+         AmqpMessage message3 = receiver3.receiveNoWait();
+         assertNotNull(message2, "did not receive message first time");
+         assertEquals("MessageID:" + i, message2.getMessageId());
+         message2.accept();
+         assertNull(message1, "message is not meant to goto lower priority 
receiver");
+         assertNull(message3, "message is not meant to goto lower priority 
receiver");
+      }
+
+      assertNoMessage(receiver1);
+      assertNoMessage(receiver3);
+
+      //Close the high priority receiver
+      receiver2.close();
+
+      sendMessages(getQueueName(), 5);
+
+      //Check messages now goto next priority receiver
+      for (int i = 0; i < 5; i++) {
+         AmqpMessage message1 = receiver1.receiveNoWait();
+         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
+         assertNotNull(message3, "did not receive message first time");
+         assertEquals("MessageID:" + i, message3.getMessageId());
+         message3.accept();
+         assertNull(message1, "message is not meant to goto lower priority 
receiver");
+      }
+
+      assertNoMessage(receiver1);
+
+      connection.close();
+   }
+
+   @Test
+   @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
+   public void testPrioritySetOnAddress() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      String queueName1 = getQueueName() + "?consumer-priority=5";
+      AmqpReceiver receiver1 = session.createReceiver(queueName1, null, false, 
false);
+      receiver1.flow(100);
+
+      String queueName2 = getQueueName() + "?consumer-priority=50";
+      AmqpReceiver receiver2 = session.createReceiver(queueName2, null, false, 
false);
+      receiver2.flow(100);
+
+      String queueName3 = getQueueName() + "?consumer-priority=10";
+      AmqpReceiver receiver3 = session.createReceiver(queueName3, null, false, 
false);
+      receiver3.flow(100);
+
+      sendMessages(getQueueName(), 5);
+
+      for (int i = 0; i < 5; i++) {
+         AmqpMessage message1 = receiver1.receiveNoWait();
+         AmqpMessage message2 = receiver2.receive(250, TimeUnit.MILLISECONDS);
+         AmqpMessage message3 = receiver3.receiveNoWait();
+         assertNotNull(message2, "did not receive message first time");
+         assertEquals("MessageID:" + i, message2.getMessageId());
+         message2.accept();
+         assertNull(message1, "message is not meant to goto lower priority 
receiver");
+         assertNull(message3, "message is not meant to goto lower priority 
receiver");
+      }
+
+      assertNoMessage(receiver1);
+      assertNoMessage(receiver3);
+
+      //Close the high priority receiver
+      receiver2.close();
+
+      sendMessages(getQueueName(), 5);
+
+      //Check messages now goto next priority receiver
+      for (int i = 0; i < 5; i++) {
+         AmqpMessage message1 = receiver1.receiveNoWait();
+         AmqpMessage message3 = receiver3.receive(250, TimeUnit.MILLISECONDS);
+         assertNotNull(message3, "did not receive message first time");
+         assertEquals("MessageID:" + i, message3.getMessageId());
+         message3.accept();
+         assertNull(message1, "message is not meant to goto lower priority 
receiver");
+      }
+
+      assertNoMessage(receiver1);
+
+      connection.close();
+   }
+
+   @Test
+   @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
+   public void testAttachPropertiesPriorityTakesPrecedenceOverAddress() throws 
Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      Map<Symbol, Object> properties1 = new HashMap<>();
+      properties1.put(Symbol.getSymbol("priority"), 5);
+      String queueName1 = getQueueName() + "?consumer-priority=50";
+      AmqpReceiver receiver1 = session.createReceiver(queueName1, null, false, 
false, properties1);
+      receiver1.flow(100);
+
+      Map<Symbol, Object> properties2 = new HashMap<>();
+      properties2.put(Symbol.getSymbol("priority"), 50);
+      String queueName2 = getQueueName() + 
"?consumer-priority=10&ingored-parameter=false";
+      AmqpReceiver receiver2 = session.createReceiver(queueName2, null, false, 
false, properties2);
+      receiver2.flow(100);
+
+      Map<Symbol, Object> properties3 = new HashMap<>();
+      properties3.put(Symbol.getSymbol("priority"), 10);
+      String queueName3 = getQueueName() + "?consumer-priority=5";
+      AmqpReceiver receiver3 = session.createReceiver(queueName3, null, false, 
false, properties3);
+      receiver3.flow(100);
+
+      sendMessages(getQueueName(), 5);
 
       for (int i = 0; i < 5; i++) {
          AmqpMessage message1 = receiver1.receiveNoWait();
@@ -75,6 +190,7 @@ public class AmqpReceiverPriorityTest extends 
AmqpClientTestSupport {
          assertNull(message1, "message is not meant to goto lower priority 
receiver");
          assertNull(message3, "message is not meant to goto lower priority 
receiver");
       }
+
       assertNoMessage(receiver1);
       assertNoMessage(receiver3);
 
@@ -92,8 +208,27 @@ public class AmqpReceiverPriorityTest extends 
AmqpClientTestSupport {
          message3.accept();
          assertNull(message1, "message is not meant to goto lower priority 
receiver");
       }
+
       assertNoMessage(receiver1);
 
+      connection.close();
+   }
+
+   @Test
+   @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
+   public void testBadValueInPriorityPropertyOnAddress() throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      String queueName1 = getQueueName() + "?consumer-priority=test";
+
+      try {
+         session.createReceiver(queueName1, null, false, false);
+         fail("Should fail to create as query string is malformed.");
+      } catch (Exception e) {
+         // expected
+      }
 
       connection.close();
    }
@@ -116,9 +251,7 @@ public class AmqpReceiverPriorityTest extends 
AmqpClientTestSupport {
       testPriorityNumber(UnsignedInteger.valueOf(5));
    }
 
-
    private void testPriorityNumber(Number number) throws Exception {
-
       AmqpClient client = createAmqpClient();
       AmqpConnection connection = addConnection(client.connect());
       AmqpSession session = connection.createSession();
@@ -130,7 +263,6 @@ public class AmqpReceiverPriorityTest extends 
AmqpClientTestSupport {
 
       sendMessages(getQueueName(), 2);
 
-
       for (int i = 0; i < 2; i++) {
          AmqpMessage message1 = receiver1.receive(3000, TimeUnit.MILLISECONDS);
          assertNotNull(message1, "did not receive message" + i);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to