This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 659b17c3a9 ARTEMIS-4745 Allow configuration of the pull consumer batch 
size
659b17c3a9 is described below

commit 659b17c3a933a7d80f9148836d7b9bbd4275932f
Author: Timothy Bish <tabish...@gmail.com>
AuthorDate: Fri Apr 26 11:00:26 2024 -0400

    ARTEMIS-4745 Allow configuration of the pull consumer batch size
    
    Allow for configuration of the batch size granted to the remote when an
    AMQP federation queue receiver is pulling messages only when there is
    local capacity to handle them. Some code housekeeping is done here to
    make adding future properties a bit simpler and require fewer changes.
---
 .../amqp/connect/federation/AMQPFederation.java    | 35 +---------
 .../federation/AMQPFederationAddressConsumer.java  |  4 +-
 .../AMQPFederationAddressPolicyManager.java        |  2 +-
 .../federation/AMQPFederationConfiguration.java    | 21 ++++++
 .../federation/AMQPFederationConstants.java        |  8 +++
 .../AMQPFederationConsumerConfiguration.java       | 30 +++++++--
 .../federation/AMQPFederationQueueConsumer.java    | 10 ++-
 .../connect/federation/AMQPFederationSource.java   | 57 +---------------
 .../connect/federation/AMQPFederationTarget.java   | 34 +---------
 .../amqp/connect/AMQPFederationConnectTest.java    |  4 ++
 .../connect/AMQPFederationQueuePolicyTest.java     | 76 +++++++++++++++++++---
 11 files changed, 136 insertions(+), 145 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
index d2a8cf3542..1f3c818155 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederation.java
@@ -36,7 +36,6 @@ import 
org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQ
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
 import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -132,39 +131,9 @@ public abstract class AMQPFederation implements 
FederationInternal {
    public abstract AMQPSessionContext getSessionContext();
 
    /**
-    * @return the timeout before signaling an error when creating remote link 
(0 mean disable).
+    * @return the federation configuration that is in effect.
     */
-   public abstract int getLinkAttachTimeout();
-
-   /**
-    * @return the configured {@link Receiver} link credit batch size.
-    */
-   public abstract int getReceiverCredits();
-
-   /**
-    * @return the configured {@link Receiver} link credit low value.
-    */
-   public abstract int getReceiverCreditsLow();
-
-   /**
-    * @return the size in bytes before a message is considered large.
-    */
-   public abstract int getLargeMessageThreshold();
-
-   /**
-    * @return the true if the federation should ignore filters on queue 
consumers.
-    */
-   public abstract boolean isIgnoreQueueConsumerFilters();
-
-   /**
-    * @return the true if the federation should ignore priorities on queue 
consumers.
-    */
-   public abstract boolean isIgnoreQueueConsumerPriorities();
-
-   /**
-    * @return the true if the federation should support core message tunneling.
-    */
-   public abstract boolean isCoreMessageTunnelingEnabled();
+   public abstract AMQPFederationConfiguration getConfiguration();
 
    @Override
    public final synchronized void start() throws ActiveMQException {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
index 777452c708..2e8346a63f 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressConsumer.java
@@ -321,11 +321,11 @@ public class AMQPFederationAddressConsumer implements 
FederationConsumerInternal
             final ScheduledFuture<?> openTimeoutTask;
             final AtomicBoolean openTimedOut = new AtomicBoolean(false);
 
-            if (federation.getLinkAttachTimeout() > 0) {
+            if (configuration.getLinkAttachTimeout() > 0) {
                openTimeoutTask = 
federation.getServer().getScheduledPool().schedule(() -> {
                   openTimedOut.set(true);
                   
federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
-               }, federation.getLinkAttachTimeout(), TimeUnit.SECONDS);
+               }, configuration.getLinkAttachTimeout(), TimeUnit.SECONDS);
             } else {
                openTimeoutTask = null;
             }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
index f6bc4f9e2d..ca5ce721a9 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationAddressPolicyManager.java
@@ -99,7 +99,7 @@ public class AMQPFederationAddressPolicyManager extends 
FederationAddressPolicyM
 
       // Address consumers can't pull as we have no real metric to indicate 
when / how much
       // we should pull so instead we refuse to match if credit set to zero.
-      if (federation.getReceiverCredits() <= 0) {
+      if (federation.getConfiguration().getReceiverCredits() <= 0) {
          logger.debug("Federation address policy rejecting match on {} because 
credit is set to zero:", addressInfo.getName());
          return false;
       } else {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java
index f549878323..a27443d690 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConfiguration.java
@@ -19,6 +19,7 @@ package 
org.apache.activemq.artemis.protocol.amqp.connect.federation;
 
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_FILTERS;
@@ -47,6 +48,11 @@ public final class AMQPFederationConfiguration {
     */
    public static final int DEFAULT_LINK_ATTACH_TIMEOUT = 30;
 
+   /**
+    * Default credits granted to a receiver that is in pull mode.
+    */
+   public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100;
+
    /**
     * Default value for the core message tunneling feature that indicates if 
core protocol messages
     * should be streamed as binary blobs as the payload of an custom AMQP 
message which avoids any
@@ -112,6 +118,20 @@ public final class AMQPFederationConfiguration {
       }
    }
 
+   /**
+    * @return the credit batch size offered to a {@link Receiver} link that is 
in pull mode.
+    */
+   public int getPullReceiverBatchSize() {
+      final Object property = properties.get(PULL_RECEIVER_BATCH_SIZE);
+      if (property instanceof Number) {
+         return ((Number) property).intValue();
+      } else if (property instanceof String) {
+         return Integer.parseInt((String) property);
+      } else {
+         return DEFAULT_PULL_CREDIT_BATCH_SIZE;
+      }
+   }
+
    /**
     * @return the size in bytes of an incoming message after which the {@link 
Receiver} treats it as large.
     */
@@ -193,6 +213,7 @@ public final class AMQPFederationConfiguration {
 
       configMap.put(RECEIVER_CREDITS, getReceiverCredits());
       configMap.put(RECEIVER_CREDITS_LOW, getReceiverCreditsLow());
+      configMap.put(PULL_RECEIVER_BATCH_SIZE, getPullReceiverBatchSize());
       configMap.put(LARGE_MESSAGE_THRESHOLD, getLargeMessageThreshold());
       configMap.put(LINK_ATTACH_TIMEOUT, getLinkAttachTimeout());
       configMap.put(IGNORE_QUEUE_CONSUMER_FILTERS, 
isIgnoreSubscriptionFilters());
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
index 30e94a2be9..85183c121d 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConstants.java
@@ -77,6 +77,14 @@ public final class AMQPFederationConstants {
     */
    public static final String RECEIVER_CREDITS_LOW = "amqpLowCredits";
 
+   /**
+    * Configuration property that defines the amount of credits to batch to an 
AMQP receiver link
+    * and the top up value when sending more credit once the broker has 
capacity available for
+    * them. this can be sent to the peer so that dual federation 
configurations share the same
+    * configuration on both sides of the connection.
+    */
+   public static final String PULL_RECEIVER_BATCH_SIZE = 
"amqpPullConsumerCredits";
+
    /**
     * Configuration property used to convey the local side value to use when 
considering if a message
     * is a large message, this can be sent to the peer so that dual federation 
configurations share
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java
index 0f5dfd0c1c..cced975129 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationConsumerConfiguration.java
@@ -21,6 +21,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.IGNORE_QUEUE_CONSUMER_PRIORITIES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LARGE_MESSAGE_THRESHOLD;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.RECEIVER_CREDITS_LOW;
 
@@ -29,6 +30,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.qpid.proton.engine.Receiver;
 
 /**
  * Configuration options applied to a consumer created from federation policies
@@ -59,7 +61,7 @@ public final class AMQPFederationConsumerConfiguration {
       } else if (property instanceof String) {
          return Integer.parseInt((String) property);
       } else {
-         return federation.getReceiverCredits();
+         return federation.getConfiguration().getReceiverCredits();
       }
    }
 
@@ -70,7 +72,21 @@ public final class AMQPFederationConsumerConfiguration {
       } else if (property instanceof String) {
          return Integer.parseInt((String) property);
       } else {
-         return federation.getReceiverCreditsLow();
+         return federation.getConfiguration().getReceiverCreditsLow();
+      }
+   }
+
+   /**
+    * @return the credit batch size offered to a {@link Receiver} link that is 
in pull mode.
+    */
+   public int getPullReceiverBatchSize() {
+      final Object property = properties.get(PULL_RECEIVER_BATCH_SIZE);
+      if (property instanceof Number) {
+         return ((Number) property).intValue();
+      } else if (property instanceof String) {
+         return Integer.parseInt((String) property);
+      } else {
+         return federation.getConfiguration().getPullReceiverBatchSize();
       }
    }
 
@@ -81,7 +97,7 @@ public final class AMQPFederationConsumerConfiguration {
       } else if (property instanceof String) {
          return Integer.parseInt((String) property);
       } else {
-         return federation.getLargeMessageThreshold();
+         return federation.getConfiguration().getLargeMessageThreshold();
       }
    }
 
@@ -92,7 +108,7 @@ public final class AMQPFederationConsumerConfiguration {
       } else if (property instanceof String) {
          return Integer.parseInt((String) property);
       } else {
-         return federation.getLinkAttachTimeout();
+         return federation.getConfiguration().getLinkAttachTimeout();
       }
    }
 
@@ -103,7 +119,7 @@ public final class AMQPFederationConsumerConfiguration {
       } else if (property instanceof String) {
          return Boolean.parseBoolean((String) property);
       } else {
-         return federation.isCoreMessageTunnelingEnabled();
+         return federation.getConfiguration().isCoreMessageTunnelingEnabled();
       }
    }
 
@@ -114,7 +130,7 @@ public final class AMQPFederationConsumerConfiguration {
       } else if (property instanceof String) {
          return Boolean.parseBoolean((String) property);
       } else {
-         return federation.isIgnoreQueueConsumerFilters();
+         return federation.getConfiguration().isIgnoreSubscriptionFilters();
       }
    }
 
@@ -125,7 +141,7 @@ public final class AMQPFederationConsumerConfiguration {
       } else if (property instanceof String) {
          return Boolean.parseBoolean((String) property);
       } else {
-         return federation.isIgnoreQueueConsumerPriorities();
+         return federation.getConfiguration().isIgnoreSubscriptionPriorities();
       }
    }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
index 086ce23c01..03fec69e54 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationQueueConsumer.java
@@ -91,8 +91,6 @@ public class AMQPFederationQueueConsumer implements 
FederationConsumerInternal {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   public static final int DEFAULT_PULL_CREDIT_BATCH_SIZE = 100;
-
    public static final int DEFAULT_PENDING_MSG_CHECK_BACKOFF_MULTIPLIER = 2;
    public static final int DEFAULT_PENDING_MSG_CHECK_MAX_DELAY = 30;
 
@@ -315,11 +313,11 @@ public class AMQPFederationQueueConsumer implements 
FederationConsumerInternal {
             final ScheduledFuture<?> openTimeoutTask;
             final AtomicBoolean openTimedOut = new AtomicBoolean(false);
 
-            if (federation.getLinkAttachTimeout() > 0) {
+            if (configuration.getLinkAttachTimeout() > 0) {
                openTimeoutTask = 
federation.getServer().getScheduledPool().schedule(() -> {
                   openTimedOut.set(true);
                   
federation.signalResourceCreateError(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout());
-               }, federation.getLinkAttachTimeout(), TimeUnit.SECONDS);
+               }, configuration.getLinkAttachTimeout(), TimeUnit.SECONDS);
             } else {
                openTimeoutTask = null;
             }
@@ -516,7 +514,7 @@ public class AMQPFederationQueueConsumer implements 
FederationConsumerInternal {
          // credit. This also allows consumers created on the remote side of a 
federation connection
          // to read from properties sent from the federation source that 
indicate the values that are
          // configured on the local side.
-         if (federation.getReceiverCredits() > 0) {
+         if (configuration.getReceiverCredits() > 0) {
             return createCreditRunnable(configuration.getReceiverCredits(), 
configuration.getReceiverCreditsLow(), receiver, connection, this);
          } else {
             return this::checkIfCreditTopUpNeeded;
@@ -579,7 +577,7 @@ public class AMQPFederationQueueConsumer implements 
FederationConsumerInternal {
             return; // Closed before this was triggered.
          }
 
-         receiver.flow(DEFAULT_PULL_CREDIT_BATCH_SIZE);
+         receiver.flow(configuration.getPullReceiverBatchSize());
          connection.instantFlush();
          lastBacklogCheckDelay = 0;
          creditTopUpInProgress.set(false);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
index d68fc6aa8c..96fbdbe9a3 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationSource.java
@@ -124,11 +124,6 @@ public class AMQPFederationSource extends AMQPFederation {
       return brokerConnection;
    }
 
-   @Override
-   public int getLinkAttachTimeout() {
-      return configuration.getLinkAttachTimeout();
-   }
-
    @Override
    public synchronized AMQPSessionContext getSessionContext() {
       if (!connected) {
@@ -148,58 +143,12 @@ public class AMQPFederationSource extends AMQPFederation {
    }
 
    @Override
-   public synchronized int getReceiverCredits() {
-      if (!connected) {
-         throw new IllegalStateException("Cannot access connection 
configuration, federation is not connected");
-      }
-
-      return configuration.getReceiverCredits();
-   }
-
-   @Override
-   public synchronized int getReceiverCreditsLow() {
+   public synchronized AMQPFederationConfiguration getConfiguration() {
       if (!connected) {
-         throw new IllegalStateException("Cannot access connection 
configuration, federation is not connected");
-      }
-
-      return configuration.getReceiverCreditsLow();
-   }
-
-   @Override
-   public synchronized int getLargeMessageThreshold() {
-      if (!connected) {
-         throw new IllegalStateException("Cannot access connection 
configuration, federation is not connected");
-      }
-
-      return configuration.getLargeMessageThreshold();
-   }
-
-   @Override
-   public boolean isCoreMessageTunnelingEnabled() {
-      if (!connected) {
-         throw new IllegalStateException("Cannot access connection 
configuration, federation is not connected");
-      }
-
-      return configuration.isCoreMessageTunnelingEnabled();
-   }
-
-
-   @Override
-   public boolean isIgnoreQueueConsumerFilters() {
-      if (!connected) {
-         throw new IllegalStateException("Cannot access connection 
configuration, federation is not connected");
-      }
-
-      return configuration.isIgnoreSubscriptionFilters();
-   }
-
-   @Override
-   public boolean isIgnoreQueueConsumerPriorities() {
-      if (!connected) {
-         throw new IllegalStateException("Cannot access connection 
configuration, federation is not connected");
+         throw new IllegalStateException("Cannot access connection while 
federation is not connected");
       }
 
-      return configuration.isIgnoreSubscriptionPriorities();
+      return configuration;
    }
 
    /**
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java
index 581dd47745..e13527eb9b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/federation/AMQPFederationTarget.java
@@ -67,38 +67,8 @@ public class AMQPFederationTarget extends AMQPFederation {
    }
 
    @Override
-   public int getReceiverCredits() {
-      return configuration.getReceiverCredits();
-   }
-
-   @Override
-   public int getReceiverCreditsLow() {
-      return configuration.getReceiverCreditsLow();
-   }
-
-   @Override
-   public int getLargeMessageThreshold() {
-      return configuration.getLargeMessageThreshold();
-   }
-
-   @Override
-   public int getLinkAttachTimeout() {
-      return configuration.getLinkAttachTimeout();
-   }
-
-   @Override
-   public boolean isCoreMessageTunnelingEnabled() {
-      return configuration.isCoreMessageTunnelingEnabled();
-   }
-
-   @Override
-   public boolean isIgnoreQueueConsumerFilters() {
-      return configuration.isIgnoreSubscriptionFilters();
-   }
-
-   @Override
-   public boolean isIgnoreQueueConsumerPriorities() {
-      return configuration.isIgnoreSubscriptionPriorities();
+   public synchronized AMQPFederationConfiguration getConfiguration() {
+      return configuration;
    }
 
    @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
index 11bcf8c642..13b2619ed5 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationConnectTest.java
@@ -34,6 +34,7 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.LINK_ATTACH_TIMEOUT;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.OPERATION_TYPE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_NAME;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_EXCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDES;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.QUEUE_INCLUDE_FEDERATED;
@@ -141,6 +142,7 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       final int AMQP_MIN_LARGE_MESSAGE_SIZE = 10_000;
       final int AMQP_CREDITS = 100;
       final int AMQP_CREDITS_LOW = 50;
+      final int AMQP_PULL_CREDITS_BATCH = 50;
       final int AMQP_LINK_ATTACH_TIMEOUT = 60;
       final boolean AMQP_TUNNEL_CORE_MESSAGES = false;
       final boolean AMQP_INGNORE_CONSUMER_FILTERS = false;
@@ -149,6 +151,7 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
       final Map<String, Object> federationConfiguration = new HashMap<>();
       federationConfiguration.put(RECEIVER_CREDITS, AMQP_CREDITS);
       federationConfiguration.put(RECEIVER_CREDITS_LOW, AMQP_CREDITS_LOW);
+      federationConfiguration.put(PULL_RECEIVER_BATCH_SIZE, 
AMQP_PULL_CREDITS_BATCH);
       federationConfiguration.put(LARGE_MESSAGE_THRESHOLD, 
AMQP_MIN_LARGE_MESSAGE_SIZE);
       federationConfiguration.put(LINK_ATTACH_TIMEOUT, 
AMQP_LINK_ATTACH_TIMEOUT);
       federationConfiguration.put(IGNORE_QUEUE_CONSUMER_FILTERS, 
AMQP_INGNORE_CONSUMER_FILTERS);
@@ -183,6 +186,7 @@ public class AMQPFederationConnectTest extends 
AmqpClientTestSupport {
          final AMQPFederatedBrokerConnectionElement federation = new 
AMQPFederatedBrokerConnectionElement("myFederation");
          federation.addProperty(LINK_ATTACH_TIMEOUT, AMQP_LINK_ATTACH_TIMEOUT);
          federation.addProperty(AmqpSupport.TUNNEL_CORE_MESSAGES, 
Boolean.toString(AMQP_TUNNEL_CORE_MESSAGES));
+         federation.addProperty(PULL_RECEIVER_BATCH_SIZE, 
AMQP_PULL_CREDITS_BATCH);
          amqpConnection.addElement(federation);
          server.getConfiguration().addAMQPConnection(amqpConnection);
          server.start();
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index 3a987e8b5e..4cb32f385d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -41,8 +41,9 @@ import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_CLASS_NAME;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.TRANSFORMER_PROPERTIES_MAP;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.POLICY_PROPERTIES_MAP;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.PULL_RECEIVER_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport.DEFAULT_QUEUE_RECEIVER_PRIORITY_ADJUSTMENT;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueueConsumer.DEFAULT_PULL_CREDIT_BATCH_SIZE;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConfiguration.DEFAULT_PULL_CREDIT_BATCH_SIZE;
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.allOf;
@@ -2566,7 +2567,29 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
    }
 
    @Test(timeout = 20000)
-   public void testPullQueueConsumerGrantsCreditOnEmptyQueue() throws 
Exception {
+   public void testPullQueueConsumerGrantsDefaultCreditOnEmptyQueue() throws 
Exception {
+      doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(0, false, 0, false, 
DEFAULT_PULL_CREDIT_BATCH_SIZE);
+   }
+
+   @Test(timeout = 20000)
+   public void 
testPullQueueConsumerGrantsReceiverConfiguredCreditOnEmptyQueue() throws 
Exception {
+      doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(0, false, 10, true, 
10);
+   }
+
+   @Test(timeout = 20000)
+   public void 
testPullQueueConsumerGrantsFederationConfiguredCreditOnEmptyQueue() throws 
Exception {
+      doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(20, true, 0, false, 
20);
+   }
+
+   @Test(timeout = 20000)
+   public void 
testPullQueueConsumerGrantsReceiverConfiguredCreditOverFederationConfiguredOnEmptyQueue()
 throws Exception {
+      doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(20, true, 10, true, 
10);
+   }
+
+   private void doTestPullConsumerGrantsConfiguredCreditOnEmptyQueue(int 
globalBatch, boolean setGlobal,
+                                                                     int 
receiverBatch, boolean setReceiver,
+                                                                     int 
expected) throws Exception {
+
       try (ProtonTestServer peer = new ProtonTestServer()) {
          peer.expectSASLAnonymousConnect();
          peer.expectOpen().respond();
@@ -2586,14 +2609,20 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
          final AMQPFederationQueuePolicyElement receiveFromQueue = new 
AMQPFederationQueuePolicyElement();
          receiveFromQueue.setName("queue-policy");
          receiveFromQueue.addToIncludes("test", "test");
+         receiveFromQueue.addProperty(RECEIVER_CREDITS, 0);
+         if (setReceiver) {
+            receiveFromQueue.addProperty(PULL_RECEIVER_BATCH_SIZE, 
receiverBatch);
+         }
 
          final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
          element.setName(getTestName());
          element.addLocalQueuePolicy(receiveFromQueue);
+         if (setGlobal) {
+            element.addProperty(PULL_RECEIVER_BATCH_SIZE, globalBatch);
+         }
 
          final AMQPBrokerConnectConfiguration amqpConnection =
-            new AMQPBrokerConnectConfiguration(
-               getTestName(), "tcp://" + remoteURI.getHost() + ":" + 
remoteURI.getPort() + "?amqpCredits=0");
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
          amqpConnection.setReconnectAttempts(0);// No reconnects
          amqpConnection.addElement(element);
 
@@ -2611,7 +2640,7 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
                                             containsString("queue-receiver"),
                                             
containsString(server.getNodeID().toString())))
                             .respondInKind();
-         peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE);
+         peer.expectFlow().withLinkCredit(expected);
 
          final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
 
@@ -2706,6 +2735,27 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
 
    @Test(timeout = 30000)
    public void testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrain() 
throws Exception {
+      doTestPullConsumerCreditTopUpAfterEachBacklogDrain(0, false, 0, false, 
DEFAULT_PULL_CREDIT_BATCH_SIZE);
+   }
+
+   @Test(timeout = 30000)
+   public void 
testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainFederationConfigured()
 throws Exception {
+      doTestPullConsumerCreditTopUpAfterEachBacklogDrain(10, true, 0, false, 
10);
+   }
+
+   @Test(timeout = 30000)
+   public void 
testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainPolicyConfigured() 
throws Exception {
+      doTestPullConsumerCreditTopUpAfterEachBacklogDrain(0, false, 20, true, 
20);
+   }
+
+   @Test(timeout = 30000)
+   public void 
testPullQueueConsumerBatchCreditTopUpAfterEachBacklogDrainBothConfigured() 
throws Exception {
+      doTestPullConsumerCreditTopUpAfterEachBacklogDrain(100, true, 20, true, 
20);
+   }
+
+   private void doTestPullConsumerCreditTopUpAfterEachBacklogDrain(int 
globalBatch, boolean setGlobal,
+                                                                   int 
receiverBatch, boolean setReceiver,
+                                                                   int 
expected) throws Exception {
       try (ProtonTestServer peer = new ProtonTestServer()) {
          peer.expectSASLAnonymousConnect();
          peer.expectOpen().respond();
@@ -2727,10 +2777,16 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
          final AMQPFederationQueuePolicyElement receiveFromQueue = new 
AMQPFederationQueuePolicyElement();
          receiveFromQueue.setName("queue-policy");
          receiveFromQueue.addToIncludes("test", "test");
+         if (setReceiver) {
+            receiveFromQueue.addProperty(PULL_RECEIVER_BATCH_SIZE, 
receiverBatch);
+         }
 
          final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
          element.setName(getTestName());
          element.addLocalQueuePolicy(receiveFromQueue);
+         if (setGlobal) {
+            element.addProperty(PULL_RECEIVER_BATCH_SIZE, globalBatch);
+         }
 
          final AMQPBrokerConnectConfiguration amqpConnection =
             new AMQPBrokerConnectConfiguration(
@@ -2769,7 +2825,7 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
             connection.start();
 
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE);
+            peer.expectFlow().withLinkCredit(expected);
 
             // Remove the backlog and credit should be offered to the remote
             assertNotNull(consumer.receiveNoWait());
@@ -2777,7 +2833,7 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
             peer.waitForScriptToComplete(20, TimeUnit.SECONDS);
 
             // Consume all the credit that was presented in the batch
-            for (int i = 0; i < DEFAULT_PULL_CREDIT_BATCH_SIZE; ++i) {
+            for (int i = 0; i < expected; ++i) {
                peer.expectDisposition().withState().accepted();
                peer.remoteTransfer().withBody().withString("test-message")
                                     .also()
@@ -2785,19 +2841,19 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
                                     .now();
             }
 
-            Wait.assertTrue(() -> 
server.queueQuery(queueName).getMessageCount() == 
DEFAULT_PULL_CREDIT_BATCH_SIZE, 10_000);
+            Wait.assertTrue(() -> 
server.queueQuery(queueName).getMessageCount() == expected, 10_000);
 
             // Consume all the newly received message from the remote except 
one
             // which should leave the queue with a pending message so no credit
             // should be offered.
-            for (int i = 0; i < DEFAULT_PULL_CREDIT_BATCH_SIZE - 1; ++i) {
+            for (int i = 0; i < expected - 1; ++i) {
                assertNotNull(consumer.receiveNoWait());
             }
 
             // We should not get a new batch yet as there is still one pending
             // message on the local queue we have not consumed.
             peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
-            peer.expectFlow().withLinkCredit(DEFAULT_PULL_CREDIT_BATCH_SIZE);
+            peer.expectFlow().withLinkCredit(expected);
 
             // Remove the backlog and credit should be offered to the remote 
again
             assertNotNull(consumer.receiveNoWait());


Reply via email to