This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new f30029f04a ARTEMIS-5645 Drain the receiver link if address is 
rejecting messages
f30029f04a is described below

commit f30029f04a87202ac8e928b425080eb501446686
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Sep 3 10:17:09 2025 -0400

    ARTEMIS-5645 Drain the receiver link if address is rejecting messages
    
    When an address policy limits the incoming messages and starts to reject 
them
    the AMQP link should drain remote credit to limit the remote from sending 
new
    messages until the address has space again in which case new credit will be
    granted.
---
 .../amqp/broker/ProtonProtocolManager.java         |   47 +-
 .../artemis/protocol/amqp/proton/AmqpSupport.java  |    6 +
 .../amqp/proton/ProtonAbstractReceiver.java        |  188 +++-
 .../amqp/proton/ProtonServerReceiverContext.java   |   74 --
 .../proton/ProtonServerReceiverContextTest.java    |  121 ++-
 .../amqp/AmqpDrainOnNoSpaceForSendsTest.java       | 1091 ++++++++++++++++++++
 .../connect/AMQPFederationAddressPolicyTest.java   |    6 +
 .../connect/AMQPFederationQueuePolicyTest.java     |  102 ++
 8 files changed, 1535 insertions(+), 100 deletions(-)

diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index af26b5f8a5..b5f682a717 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -96,6 +96,8 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
    private boolean amqpUseModifiedForTransientDeliveryErrors = 
AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
 
+   private boolean amqpDrainOnTransientDeliveryErrors = 
AmqpSupport.AMQP_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
+
    // If set true, a reject disposition will be treated as if it were an 
unmodified disposition with the
    // delivery-failed flag set true.
    private boolean amqpTreatRejectAsUnmodifiedDeliveryFailed = 
AmqpSupport.AMQP_TREAT_REJECT_AS_UNMODIFIED_DELIVERY_FAILURE;
@@ -106,6 +108,8 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
    private String saslLoginConfigScope = "amqp-sasl-gssapi";
 
+   private int amqpLinkQuiesceTimeout = AmqpSupport.AMQP_LINK_QUIESCE_TIMEOUT;
+
    private Long amqpIdleTimeout;
 
    private long ackManagerFlushTimeout = 10_000;
@@ -449,7 +453,6 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
       return this;
    }
 
-
    public void setAmqpTreatRejectAsUnmodifiedDeliveryFailed(final boolean 
amqpTreatRejectAsUnmodifiedDeliveryFailed) {
       this.amqpTreatRejectAsUnmodifiedDeliveryFailed = 
amqpTreatRejectAsUnmodifiedDeliveryFailed;
    }
@@ -457,4 +460,46 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
    public boolean isAmqpTreatRejectAsUnmodifiedDeliveryFailed() {
       return this.amqpTreatRejectAsUnmodifiedDeliveryFailed;
    }
+
+   /**
+    * {@return true if transient delivery errors should be handled by draining 
link credit from the remote sender}
+    */
+   public boolean isDrainOnTransientDeliveryErrors() {
+      return this.amqpDrainOnTransientDeliveryErrors;
+   }
+
+   /**
+    * Sets if transient delivery errors should be handled by draining link 
credit from the remote sender
+    *
+    * @param amqpDrainOnTransientDeliveryErrors
+    *    Set to <code>true</code> if senders should be drained on transient 
delivery errors.
+    *
+    * @return this protocol manager instance.
+    */
+   public ProtonProtocolManager setAmqpDrainOnTransientDeliveryErrors(boolean 
amqpDrainOnTransientDeliveryErrors) {
+      this.amqpDrainOnTransientDeliveryErrors = 
amqpDrainOnTransientDeliveryErrors;
+      return this;
+   }
+
+   /**
+    * {@return the time in milliseconds to wait for remote sender once a link 
quiesce is initiated by this peer}
+    */
+   public int getLinkQuiesceTimeout() {
+      return amqpLinkQuiesceTimeout;
+   }
+
+   /**
+    * Sets the time in milliseconds to wait before closing a remote sender 
link if the server has requested
+    * that the link drain all outstanding credit an complete pending 
settlements. A value less than or equal
+    * to zero disables drain timeouts.
+    *
+    * @param amqpLinkQuiesceTimeout
+    *    The time in milliseconds to wait for quiesce to complete.
+    *
+    * @return this protocol manager instance.
+    */
+   public ProtonProtocolManager setAmqpLinkQuiesceTimeout(int 
amqpLinkQuiesceTimeout) {
+      this.amqpLinkQuiesceTimeout = amqpLinkQuiesceTimeout;
+      return this;
+   }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index f5ce4cc1a8..c0eab92fb9 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -49,6 +49,12 @@ public class AmqpSupport {
    // Defaults for controlling the behaviour of AMQP dispositions
    public static final boolean AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS 
= false;
 
+   // Defaults for controlling the behaviour of AMQP credit draining on 
resource exhaustion
+   public static final boolean AMQP_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS = true;
+
+   // Default timeout (in milliseconds) before closing a link as failed if a 
quiesce is initiated from this peer
+   public static final int AMQP_LINK_QUIESCE_TIMEOUT = 600_000; // ten minutes
+
    // Identification values used to locating JMS selector types.
    public static final Symbol JMS_SELECTOR_KEY = 
Symbol.valueOf("jms-selector");
    public static final UnsignedLong JMS_SELECTOR_CODE = 
UnsignedLong.valueOf(0x0000468C00000004L);
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
index 104350c94c..71cea48afc 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
@@ -20,11 +20,15 @@ import static 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessa
 import static 
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
 
 import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
 import java.util.Objects;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -35,10 +39,17 @@ import 
org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.transaction.TransactionalState;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.LinkError;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 import org.slf4j.Logger;
@@ -50,6 +61,7 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
 
    protected enum ReceiverState {
       STARTED,
+      DRAINING,
       STOPPING,
       STOPPED,
       CLOSED
@@ -67,13 +79,15 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
    protected final MessageReader largeMessageReader = new 
AMQPLargeMessageReader(this);
    protected final Runnable creditRunnable;
    protected final boolean useModified;
+   protected final boolean drainCreditOnNoSpace;
+   protected final long drainTimeout;
    protected final Runnable creditTopUpRunner = this::doCreditTopUpRun;
 
    protected volatile MessageReader messageReader;
    protected int pendingSettles = 0;
    protected volatile ReceiverState state = ReceiverState.STARTED;
    protected BiConsumer<ProtonAbstractReceiver, Boolean> pendingStop;
-   protected ScheduledFuture<?> pendingStopTimeout;
+   protected ScheduledFuture<?> pendingQuiesceTimeout;
 
    // Not always used so left unallocated until needed, on attach the 
capabilities drive if supported
    protected MessageReader coreMessageReader;
@@ -90,7 +104,9 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
       this.receiver = receiver;
       this.minLargeMessageSize = getConfiguredMinLargeMessageSize(connection);
       this.creditRunnable = createCreditRunnable(connection);
-      this.useModified = 
this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
+      this.useModified = 
connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
+      this.drainCreditOnNoSpace = 
connection.getProtocolManager().isDrainOnTransientDeliveryErrors();
+      this.drainTimeout = 
connection.getProtocolManager().getLinkQuiesceTimeout();
       this.routingContext = new 
RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
    }
 
@@ -128,25 +144,38 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
     * an asynchronous call invalid. The stop call allows a timeout to be 
specified which will signal the stopped
     * consumer if the timeout elapses and leaves the receiver in the stopping 
state which does not allow for a restart.
     *
-    * @param stopTimeout A time in milliseconds to wait for the stop to 
complete before considering it as having
-    *                    failed.
-    * @param onStopped   A consumer that is signaled once the receiver has 
stopped or the timeout elapsed.
+    * @param stopTimeout
+    *    A time in milliseconds to wait for the stop to complete before 
considering it as having failed.
+    * @param onStopped
+    *    A consumer that is signaled once the receiver has stopped or the 
timeout elapsed.
+    *
     * @throws IllegalStateException if the receiver is currently in the 
stopping state.
     */
    public void stop(int stopTimeout, BiConsumer<ProtonAbstractReceiver, 
Boolean> onStopped) {
       Objects.requireNonNull(onStopped, "The stopped callback must not be 
null");
       connection.requireInHandler();
 
-      if (isStarted()) {
+      if (state.ordinal() < ReceiverState.STOPPING.ordinal()) {
          state = ReceiverState.STOPPING;
          pendingStop = onStopped;
-         if (!checkIfPendingStopCanComplete()) {
-            if (receiver.getCredit() != 0) {
+
+         if (!tryCompletePendingQuiesce()) {
+            // The receiver could already be draining but if not we trigger a 
drain to handle
+            // the run off of pending messages from the remote before 
reporting stopped.
+            if (receiver.getCredit() != 0 && !receiver.draining()) {
                receiver.drain(0);
             }
 
             if (stopTimeout > 0) {
-               pendingStopTimeout = 
protonSession.getServer().getScheduledPool().schedule(() -> {
+               // Take over any pending timeout setup from a drain operation 
with one with the given
+               // stop timeout otherwise the pending drain timeout will kick 
in and will signal the
+               // stop callback indicating that stop failed.
+               if (pendingQuiesceTimeout != null) {
+                  pendingQuiesceTimeout.cancel(false);
+                  pendingQuiesceTimeout = null;
+               }
+
+               pendingQuiesceTimeout = 
protonSession.getServer().getScheduledPool().schedule(() -> {
                   connection.runNow(() -> signalStoppedCallback(false));
                }, stopTimeout, TimeUnit.MILLISECONDS);
             }
@@ -183,8 +212,8 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
       return state == ReceiverState.STARTED;
    }
 
-   public boolean isBusy() {
-      return false;
+   public boolean isDraining() {
+      return state == ReceiverState.DRAINING;
    }
 
    public boolean isStopping() {
@@ -199,6 +228,10 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
       return state == ReceiverState.CLOSED;
    }
 
+   public boolean isBusy() {
+      return false;
+   }
+
    /**
     * Set the proper operation context in the Thread Local.
     *  Return the old context*/
@@ -245,6 +278,7 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
     * @param receiver   The proton receiver that will have its credit refilled
     * @param connection The connection that own the receiver
     * @param context    The context that will be associated with the receiver
+    *
     * @return A new Runnable that can be used to keep receiver credit 
replenished
     */
    public static Runnable createCreditRunnable(int refill,
@@ -320,16 +354,27 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
       if (isStarted()) {
          topUpCreditIfNeeded();
       } else {
-         checkIfPendingStopCanComplete();
+         tryCompletePendingQuiesce();
       }
    }
 
-   private boolean checkIfPendingStopCanComplete() {
-      if (isStopping() && pendingSettles == 0 && receiver.getQueued() == 0 && 
receiver.getCredit() == 0) {
-         state = ReceiverState.STOPPED;
-         signalStoppedCallback(true);
+   private boolean tryCompletePendingQuiesce() {
+      if (pendingSettles == 0 && receiver.getQueued() == 0 && 
receiver.getCredit() == 0) {
+         if (isStopping()) {
+            state = ReceiverState.STOPPED;
+            signalStoppedCallback(true);
+            return true;
+         } else if (isDraining()) {
+            state = ReceiverState.STARTED;
+
+            if (pendingQuiesceTimeout != null) {
+               pendingQuiesceTimeout.cancel(false);
+               pendingQuiesceTimeout = null;
+            }
 
-         return true;
+            topUpCreditIfNeeded();
+            return true;
+         }
       }
 
       return false;
@@ -337,10 +382,10 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
 
    @Override
    public void onFlow(int credits, boolean drain) {
-      if (isStopping()) {
-         checkIfPendingStopCanComplete();
-      } else {
+      if (isStarted()) {
          topUpCreditIfNeeded();
+      } else {
+         tryCompletePendingQuiesce();
       }
    }
 
@@ -359,7 +404,7 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
       if (!receiver.getDrain() && isStarted()) {
          receiver.flow(1);
       } else {
-         checkIfPendingStopCanComplete();
+         tryCompletePendingQuiesce();
       }
    }
 
@@ -477,6 +522,47 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
       });
    }
 
+   public void deliveryFailed(Delivery delivery, Receiver receiver, Exception 
e) {
+      connection.runNow(() -> {
+         if (drainCreditOnNoSpace && isStarted() && receiver.getCredit() > 0 
&& isAddressFull(e)) {
+            // Only when started do we want to drain off credit when the 
address is full since we
+            // will have already reacted to a stop or close. Once drained this 
will return to the
+            // started state if not stopped or closed since.
+            state = ReceiverState.DRAINING;
+            receiver.drain(0);
+
+            if (drainTimeout > 0) {
+               pendingQuiesceTimeout = 
protonSession.getServer().getScheduledPool().schedule(() -> {
+                  final ErrorCondition error = new 
ErrorCondition(LinkError.DETACH_FORCED, "Timed out waiting for remote sender to 
drain");
+
+                  connection.runLater(() -> {
+                     try {
+                        // Accounts for possible swap to stopping where the 
caller did not provide their
+                        // own timeout so the stop will leave the drain 
timeout in place if one was active.
+                        if (isStopping()) {
+                           pendingStop.accept(this, false);
+                        } else {
+                           try {
+                              close(error);
+                           } finally {
+                              receiver.close();
+                           }
+                        }
+                     } catch (ActiveMQAMQPException ex) {
+                        logger.debug("Error while attempting to close receiver 
that did not drain or stop in time", ex);
+                     } finally {
+                        connection.flush();
+                     }
+                  });
+               }, drainTimeout, TimeUnit.MILLISECONDS);
+            }
+         }
+         delivery.disposition(determineDeliveryState(((Source) 
receiver.getSource()), useModified, e));
+         settle(delivery);
+         connection.flush();
+      });
+   }
+
    /**
     * {@return either the fixed address assigned to this sender, or the last 
address used by an anonymous relay sender;
     * if this is an anonymous relay and no send has occurred then this method 
returns {@code null}}
@@ -509,9 +595,9 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
    }
 
    private void signalStoppedCallback(boolean stopped) {
-      if (pendingStopTimeout != null) {
-         pendingStopTimeout.cancel(false);
-         pendingStopTimeout = null;
+      if (pendingQuiesceTimeout != null) {
+         pendingQuiesceTimeout.cancel(false);
+         pendingQuiesceTimeout = null;
       }
 
       if (pendingStop != null) {
@@ -556,6 +642,60 @@ public abstract class ProtonAbstractReceiver extends 
ProtonInitializable impleme
       }
    }
 
+   protected static boolean isAddressFull(final Exception e) {
+      return e instanceof ActiveMQException amqe && 
ActiveMQExceptionType.ADDRESS_FULL.equals(amqe.getType());
+   }
+
+   protected static boolean outcomeSupported(final Source source, final Symbol 
outcome) {
+      if (source != null && source.getOutcomes() != null) {
+         return Arrays.asList((source).getOutcomes()).contains(outcome);
+      }
+      return false;
+   }
+
+   protected static Outcome getEffectiveDefaultOutcome(final Source source) {
+      return (source.getOutcomes() == null || source.getOutcomes().length == 
0) ? source.getDefaultOutcome() : null;
+   }
+
+   private static DeliveryState determineDeliveryState(final Source source, 
final boolean useModified, final Exception e) {
+      Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
+
+      if (isAddressFull(e) && useModified && (outcomeSupported(source, 
Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) {
+         Modified modified = new Modified();
+         modified.setDeliveryFailed(true);
+         return modified;
+      } else {
+         if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || 
defaultOutcome instanceof Rejected) {
+            return createRejected(e);
+         } else if (source.getDefaultOutcome() instanceof DeliveryState) {
+            return ((DeliveryState) source.getDefaultOutcome());
+         } else {
+            // The AMQP specification requires that Accepted is returned for 
this case. However there exist
+            // implementations that set neither outcomes/default-outcome but 
use/expect for full range of outcomes.
+            // To maintain compatibility with these implementations, we 
maintain previous behaviour.
+            return createRejected(e);
+         }
+      }
+   }
+
+   private static Rejected createRejected(final Exception e) {
+      ErrorCondition condition = new ErrorCondition();
+
+      if (e instanceof ActiveMQSecurityException) {
+         condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+      } else if (isAddressFull(e)) {
+         condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
+      } else {
+         condition.setCondition(Symbol.valueOf("failed"));
+      }
+      condition.setDescription(e.getMessage());
+
+      Rejected rejected = new Rejected();
+      rejected.setError(condition);
+
+      return rejected;
+   }
+
    public static boolean isBellowThreshold(int credit, int pending, int 
threshold) {
       return credit <= threshold - pending;
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 2ef953d3b3..dae8fe135c 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -18,13 +18,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
 
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
-
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -43,14 +39,7 @@ import 
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Outcome;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
@@ -235,14 +224,6 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
       }
    }
 
-   public void deliveryFailed(Delivery delivery, Receiver receiver, Exception 
e) {
-      connection.runNow(() -> {
-         delivery.disposition(determineDeliveryState(((Source) 
receiver.getSource()), useModified, e));
-         settle(delivery);
-         connection.flush();
-      });
-   }
-
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       super.close(remoteLinkClose);
@@ -266,28 +247,6 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
       return 
Objects.requireNonNullElse(sessionSPI.getRoutingTypeFromPrefix(address, 
sessionSPI.getDefaultRoutingType(address)), 
ActiveMQDefaultConfiguration.getDefaultRoutingType());
    }
 
-   private static DeliveryState determineDeliveryState(final Source source, 
final boolean useModified, final Exception e) {
-      Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
-
-      if (isAddressFull(e) && useModified &&
-          (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) || 
defaultOutcome instanceof Modified)) {
-         Modified modified = new Modified();
-         modified.setDeliveryFailed(true);
-         return modified;
-      } else {
-         if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) || 
defaultOutcome instanceof Rejected) {
-            return createRejected(e);
-         } else if (source.getDefaultOutcome() instanceof DeliveryState) {
-            return ((DeliveryState) source.getDefaultOutcome());
-         } else {
-            // The AMQP specification requires that Accepted is returned for 
this case. However there exist
-            // implementations that set neither outcomes/default-outcome but 
use/expect for full range of outcomes.
-            // To maintain compatibility with these implementations, we 
maintain previous behaviour.
-            return createRejected(e);
-         }
-      }
-   }
-
    private static RoutingType getExplicitRoutingType(Symbol[] symbols) {
       if (symbols != null) {
          for (Symbol symbol : symbols) {
@@ -300,37 +259,4 @@ public class ProtonServerReceiverContext extends 
ProtonAbstractReceiver {
       }
       return null;
    }
-
-   private static Rejected createRejected(final Exception e) {
-      ErrorCondition condition = new ErrorCondition();
-
-      if (e instanceof ActiveMQSecurityException) {
-         condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
-      } else if (isAddressFull(e)) {
-         condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
-      } else {
-         condition.setCondition(Symbol.valueOf("failed"));
-      }
-      condition.setDescription(e.getMessage());
-
-      Rejected rejected = new Rejected();
-      rejected.setError(condition);
-
-      return rejected;
-   }
-
-   private static boolean isAddressFull(final Exception e) {
-      return e instanceof ActiveMQException amqe && 
ActiveMQExceptionType.ADDRESS_FULL.equals(amqe.getType());
-   }
-
-   private static boolean outcomeSupported(final Source source, final Symbol 
outcome) {
-      if (source != null && source.getOutcomes() != null) {
-         return Arrays.asList((source).getOutcomes()).contains(outcome);
-      }
-      return false;
-   }
-
-   private static Outcome getEffectiveDefaultOutcome(final Source source) {
-      return (source.getOutcomes() == null || source.getOutcomes().length == 
0) ? source.getDefaultOutcome() : null;
-   }
 }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
index b965c4a50e..e6cc880044 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -17,6 +17,9 @@
 package org.apache.activemq.artemis.protocol.amqp.proton;
 
 import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import io.netty.buffer.Unpooled;
@@ -24,6 +27,7 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import 
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.RoutingContext;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -34,11 +38,13 @@ import 
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
 import 
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
 import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.utils.Wait;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.Modified;
 import org.apache.qpid.proton.amqp.messaging.Outcome;
 import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.transport.DeliveryState;
 import org.apache.qpid.proton.codec.ReadableBuffer;
@@ -179,7 +185,10 @@ public class ProtonServerReceiverContextTest {
       verify(mockDelivery, times(1)).settle();
 
       verify(mockReceiver, times(1)).getDrain();
-      if (!drain) {
+      if (drain) {
+         verify(mockReceiver, times(1)).getQueued();
+         verify(mockReceiver, times(1)).getCredit();
+      } else {
          verify(mockReceiver, times(1)).flow(1);
       }
       verifyNoMoreInteractions(mockReceiver);
@@ -243,6 +252,116 @@ public class ProtonServerReceiverContextTest {
       assertEquals(900, 
ProtonServerReceiverContext.calculatedUpdateRefill(2000, 1000, 100));
    }
 
+   @Test
+   public void testStopDrainsOffCredit() throws Exception {
+      final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+
+      try {
+         ActiveMQServer mockServer = mock(ActiveMQServer.class);
+         when(mockServer.getScheduledPool()).thenReturn(scheduler);
+
+         Receiver mockReceiver = mock(Receiver.class);
+         AMQPConnectionContext mockConnContext = 
mock(AMQPConnectionContext.class);
+
+         when(mockConnContext.getAmqpCredits()).thenReturn(100);
+         when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
+
+         
when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
+
+         AMQPSessionCallback mockSessionSpi = mock(AMQPSessionCallback.class);
+         when(mockSessionSpi.getStorageManager()).thenReturn(new 
NullStorageManager());
+         when(mockSessionSpi.createStandardMessage(any(), 
any())).thenAnswer((Answer<AMQPStandardMessage>) invocation -> new 
AMQPStandardMessage(0, createAMQPMessageBuffer(), null, null));
+
+         AMQPSessionContext mockProtonContext = mock(AMQPSessionContext.class);
+         when(mockProtonContext.getServer()).thenReturn(mockServer);
+         when(mockProtonContext.getSessionSPI()).thenReturn(mockSessionSpi);
+
+         ProtonServerReceiverContext rc = new 
ProtonServerReceiverContext(mockSessionSpi, mockConnContext, mockProtonContext, 
mockReceiver);
+
+         AtomicBoolean stopped = new AtomicBoolean();
+
+         when(mockReceiver.getCredit()).thenReturn(100);
+
+         rc.stop(10_000, (rcvr, didStop) -> stopped.set(true));
+
+         assertFalse(stopped.get());
+
+         verify(mockReceiver, times(1)).drain(0);
+         verify(mockReceiver, times(1)).draining();
+         verify(mockReceiver, times(2)).getCredit();
+         verify(mockReceiver, times(1)).getQueued();
+         verify(mockReceiver, times(1)).drain(0);
+      } finally {
+         scheduler.shutdown();
+      }
+   }
+
+   @Test
+   public void testDrainInitiatedWhenDeliveryMarkedFailed() throws Exception {
+      final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+
+      try {
+         ActiveMQServer mockServer = mock(ActiveMQServer.class);
+         when(mockServer.getScheduledPool()).thenReturn(scheduler);
+
+         Source source = new Source();
+         source.setOutcomes(new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, 
Rejected.DESCRIPTOR_SYMBOL,
+                                         Modified.DESCRIPTOR_SYMBOL, 
Released.DESCRIPTOR_SYMBOL});
+
+         Receiver mockReceiver = mock(Receiver.class);
+         when(mockReceiver.getSource()).thenReturn(source);
+
+         ProtonProtocolManager mockProtocolManager = 
mock(ProtonProtocolManager.class);
+         when(mockProtocolManager.getLinkQuiesceTimeout()).thenReturn(100);
+         
when(mockProtocolManager.isDrainOnTransientDeliveryErrors()).thenReturn(true);
+
+         AMQPConnectionContext mockConnContext = 
mock(AMQPConnectionContext.class);
+         when(mockConnContext.getAmqpCredits()).thenReturn(100);
+         when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
+         
when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager);
+
+         doAnswer(invocation -> {
+            final Runnable target = invocation.getArgument(0, Runnable.class);
+            target.run();
+            return null;
+         }).when(mockConnContext).runNow(any(Runnable.class));
+
+         doAnswer(invocation -> {
+            final Runnable target = invocation.getArgument(0, Runnable.class);
+            target.run();
+            return null;
+         }).when(mockConnContext).runLater(any(Runnable.class));
+
+         AMQPSessionCallback mockSessionSpi = mock(AMQPSessionCallback.class);
+         when(mockSessionSpi.getStorageManager()).thenReturn(new 
NullStorageManager());
+         when(mockSessionSpi.createStandardMessage(any(), 
any())).thenAnswer((Answer<AMQPStandardMessage>) invocation -> new 
AMQPStandardMessage(0, createAMQPMessageBuffer(), null, null));
+
+         AMQPSessionContext mockProtonContext = mock(AMQPSessionContext.class);
+         when(mockProtonContext.getServer()).thenReturn(mockServer);
+         when(mockProtonContext.getSessionSPI()).thenReturn(mockSessionSpi);
+
+         ProtonServerReceiverContext rc = new 
ProtonServerReceiverContext(mockSessionSpi, mockConnContext, mockProtonContext, 
mockReceiver);
+
+         when(mockReceiver.getCredit()).thenReturn(100);
+
+         Delivery mockDelivery = mock(Delivery.class);
+         when(mockDelivery.getLink()).thenReturn(mockReceiver);
+         when(mockReceiver.current()).thenReturn(mockDelivery);
+
+         rc.incrementSettle();
+         rc.deliveryFailed(mockDelivery, mockReceiver, new 
ActiveMQAddressFullException("full"));
+
+         Wait.assertTrue(() -> rc.isClosed(), 5000, 50);
+
+         verify(mockReceiver, times(1)).drain(0);
+         verify(mockReceiver, times(2)).getCredit();
+         verify(mockReceiver, times(1)).drain(0);
+         verify(mockReceiver, times(1)).getSource();
+      } finally {
+         scheduler.shutdown();
+      }
+   }
+
    private ReadableBuffer createAMQPMessageBuffer() {
       MessageImpl message = (MessageImpl) Message.Factory.create();
       message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDrainOnNoSpaceForSendsTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDrainOnNoSpaceForSendsTest.java
new file mode 100644
index 0000000000..9bd9a47a11
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDrainOnNoSpaceForSendsTest.java
@@ -0,0 +1,1091 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.proton.amqp.transport.LinkError;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.jupiter.api.Test;
+
+public class AmqpDrainOnNoSpaceForSendsTest extends AmqpClientTestSupport {
+
+   @Override
+   protected String getConfiguredProtocols() {
+      return "AMQP";
+   }
+
+   @Override
+   protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
+      tc.getParams().put("amqpCredits", "3");
+      tc.getParams().put("amqpLowCredits", "1");
+   }
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch("#");
+      
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+      addressSettings.setMaxSizeBytes(500);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+   }
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      // Creates the broker used to make the outgoing connection. The port 
passed is for
+      // that brokers acceptor. The test server connected to by the broker 
binds to a random port.
+      return createServer(AMQP_PORT, false);
+   }
+
+   @Test
+   public void testDrainCreditOnClientSendFailForMulticastAddress() throws 
Exception {
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender").now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().rejected();
+         peer.expectDetach();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+                              .withBody().withString("Third Message: ")
+                              .also()
+                              .withDeliveryId(3)
+                              .later(10);
+         peer.remoteDetach().later(15);
+
+         // Should be no new flow since the address remains full
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void testDrainCreditOnClientSendFailForAnyCastAddress() throws 
Exception {
+      server.start();
+      
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+                                                             
.setAddress(getTestName())
+                                                             
.setAutoCreated(false));
+
+      Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100);
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender").now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().withAddress(getTestName())
+                                         .withCapabilities("queue").also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().rejected();
+         peer.expectDetach();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+                              .withBody().withString("Third Message: ")
+                              .also()
+                              .withDeliveryId(3)
+                              .later(10);
+         peer.remoteDetach().later(15);
+
+         // Should be no new flow since the address remains full
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void 
testDoesDrainCreditOnClientSendFailForAnonymousRelaySenderForMatchedAddressPolicy()
 throws Exception {
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender")
+                          
.withDesiredCapabilities(AmqpSupport.ANONYMOUS_RELAY.toString()).now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+         final Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+         final Queue queue2 = server.locateQueue(SimpleString.of("queue2"));
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+                              .withBody().withString("Third Message: ")
+                              .also()
+                              .withDeliveryId(3)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(3);
+
+         // Make capacity again which should now replenish credit.
+         queue1.deleteQueue();
+         queue2.deleteQueue();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().accepted();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "4").also()
+                              .withBody().withString("Fourth Message: ")
+                              .also()
+                              .withDeliveryId(4)
+                              .later(10);
+
+         // Should be no new flow since the address remains full
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void 
testDrainedAnonymousRelaySenderCanBurnCreditSendingToOtherAddresses() throws 
Exception {
+      final String queueNameA = getTestName() + "-A";
+      final String queueNameB = getTestName() + "-B";
+
+      AddressSettings addressSettings1 = 
server.getAddressSettingsRepository().getMatch(queueNameA);
+      
addressSettings1.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+      addressSettings1.setMaxSizeBytes(500);
+      server.getAddressSettingsRepository().addMatch(queueNameA, 
addressSettings1);
+
+      AddressSettings addressSettings2 = 
server.getAddressSettingsRepository().getMatch(queueNameB);
+      
addressSettings2.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+      addressSettings2.setMaxSizeBytes(500);
+      server.getAddressSettingsRepository().addMatch(queueNameB, 
addressSettings2);
+
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender")
+                          
.withDesiredCapabilities(AmqpSupport.ANONYMOUS_RELAY.toString()).now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of(queueNameA).setRoutingType(RoutingType.ANYCAST)
+                                                             
.setAddress(queueNameA)
+                                                             
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of(queueNameB).setRoutingType(RoutingType.MULTICAST)
+                                                             
.setAddress(queueNameB)
+                                                             
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(queueNameA)).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(queueNameB)).isExists(), 5000, 100);
+
+         final Queue queue1 = server.locateQueue(SimpleString.of(queueNameA));
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameA).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameA).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().accepted();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameB).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+                              .withBody().withString("Third Message: ")
+                              .also()
+                              .withDeliveryId(3)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(3);
+
+         // Make capacity again which should now replenish credit.
+         queue1.deleteQueue();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().accepted();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameA).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "4").also()
+                              .withBody().withString("Fourth Message: ")
+                              .also()
+                              .withDeliveryId(4)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void 
testDrainedAnonymousRelaySenderCreditNotReplenishedOnBurnOfExistingCredit() 
throws Exception {
+      final String queueNameA = getTestName() + "-A";
+      final String queueNameB = getTestName() + "-B";
+
+      AddressSettings addressSettings1 = 
server.getAddressSettingsRepository().getMatch(queueNameA);
+      
addressSettings1.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+      addressSettings1.setMaxSizeBytes(500);
+      server.getAddressSettingsRepository().addMatch(queueNameA, 
addressSettings1);
+
+      AddressSettings addressSettings2 = 
server.getAddressSettingsRepository().getMatch(queueNameB);
+      
addressSettings2.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+      addressSettings2.setMaxSizeBytes(500);
+      server.getAddressSettingsRepository().addMatch(queueNameB, 
addressSettings2);
+
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
peer.expectOpen().withOfferedCapability(AmqpSupport.ANONYMOUS_RELAY.toString());
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender")
+                          
.withDesiredCapabilities(AmqpSupport.ANONYMOUS_RELAY.toString()).now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of(queueNameA).setRoutingType(RoutingType.ANYCAST)
+                                                             
.setAddress(queueNameA)
+                                                             
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of(queueNameB).setRoutingType(RoutingType.MULTICAST)
+                                                             
.setAddress(queueNameB)
+                                                             
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(queueNameA)).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(queueNameB)).isExists(), 5000, 100);
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameA).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameA).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().accepted();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameB).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+                              .withBody().withString("Third Message: ")
+                              .also()
+                              .withDeliveryId(3)
+                              .later(10);
+
+         // Create some traffic to ensure we don't see unexpected flow frames
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().respond();
+         peer.remoteAttach().ofReceiver()
+                            .withName("receiving-peer")
+                            .withSource().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withTarget().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDetach().respond();
+         peer.remoteDetach().later(1);
+
+         // Should be no new flow since the address remains full
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void 
testDrainAnonymousRelayOnceBlockPolicyHitsRejectMaxSizeThreshold() throws 
Exception {
+      final String queueNameA = getTestName() + "A";
+      final String queueNameB = getTestName() + "B";
+
+      AddressSettings addressSettings1 = 
server.getAddressSettingsRepository().getMatch(queueNameA);
+      
addressSettings1.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      addressSettings1.setMaxSizeBytes(500);
+      addressSettings1.setMaxSizeBytesRejectThreshold(1000);
+      server.getAddressSettingsRepository().addMatch(queueNameA, 
addressSettings1);
+
+      AddressSettings addressSettings2 = 
server.getAddressSettingsRepository().getMatch(queueNameB);
+      
addressSettings2.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      addressSettings2.setMaxSizeBytes(500);
+      addressSettings2.setMaxSizeBytesRejectThreshold(1000);
+      server.getAddressSettingsRepository().addMatch(queueNameB, 
addressSettings2);
+
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
peer.expectOpen().withOfferedCapability(AmqpSupport.ANONYMOUS_RELAY.toString());
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender")
+                          
.withDesiredCapabilities(AmqpSupport.ANONYMOUS_RELAY.toString()).now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of(queueNameA).setRoutingType(RoutingType.MULTICAST)
+                                                             
.setAddress(queueNameA)
+                                                             
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of(queueNameB).setRoutingType(RoutingType.MULTICAST)
+                                                             
.setAddress(queueNameB)
+                                                             
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(queueNameA)).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of(queueNameB)).isExists(), 5000, 100);
+
+         Queue queue1 = server.locateQueue(SimpleString.of(queueNameA));
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(1024);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameA).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(queueNameA).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         // Make capacity again which should now replenish credit once remote 
send drain response.
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender();
+         peer.expectDetach();
+         peer.remoteAttach().ofReceiver()
+                            .withName("receiving-peer")
+                            .withSource().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withTarget().also()
+                            .now();
+         peer.remoteDetach().later(1);
+
+         queue1.deleteQueue();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+         // Respond drained with one credit outstanding, remote should then 
grant a new batch
+         
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(3).withDrain(true).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void 
testDoesNotDrainCreditOnClientSendFailIfAcceptorConfiguredNotTo() throws 
Exception {
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      server.getConfiguration().addAcceptorConfiguration("server",
+         "tcp://localhost:" + AMQP_PORT + 
"?amqpCredits=3&amqpLowCredits=0&amqpDrainOnTransientDeliveryErrors=false");
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender").now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+         final Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+         final Queue queue2 = server.locateQueue(SimpleString.of("queue2"));
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+                              .withBody().withString("Third Message: ")
+                              .also()
+                              .withDeliveryId(3)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(3);
+
+         // Make capacity again which should now replenish credit.
+         queue1.deleteQueue();
+         queue2.deleteQueue();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDisposition().withState().accepted();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "4").also()
+                              .withBody().withString("Fourth Message: ")
+                              .also()
+                              .withDeliveryId(4)
+                              .later(10);
+
+         // Should be no new flow since the address remains full
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void 
testDrainCreditOnClientSendFailPeerAnsweresDrainedButFlowsOnlyAfterSpaceCleared()
 throws Exception {
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender").now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+         final Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+         final Queue queue2 = server.locateQueue(SimpleString.of("queue2"));
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(3).withDrain(true).now();
+
+         // The address should still be full so nothing expected until the 
queues are deleted
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+         // Make capacity again which should now replenish credit.
+         queue1.deleteQueue();
+         queue2.deleteQueue();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void 
testDrainCreditOnClientSendFailNoNewCreditUntilPeerAnswersDrained() throws 
Exception {
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender").now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+
+         Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         // Make capacity again which should now replenish credit once remote 
send drain response.
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         queue1.deleteQueue();
+
+         peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+         // Respond drained with one credit outstanding, remote should then 
grant a new batch
+         
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(3).withDrain(true).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void testSenderClosedIfRemoteTimesOutWaitingForDrainToComplete() 
throws Exception {
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      server.getConfiguration().addAcceptorConfiguration("server",
+         "tcp://localhost:" + AMQP_PORT + 
"?amqpCredits=3&amqpLowCredits=0&amqpLinkQuiesceTimeout=1");
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender").now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+         
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(100);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+         peer.expectDetach().withError(LinkError.DETACH_FORCED.toString());
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              .withProperties().withTo(getTestName()).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         // Create some traffic to ensure we don't see unexpected flow frames
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender().respond();
+         peer.remoteAttach().ofReceiver()
+                            .withName("receiving-peer")
+                            .withSource().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withTarget().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectDetach().respond();
+         peer.remoteDetach().later(1);
+
+         // Should be no new flow since the address remains full
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+
+   @Test
+   public void testDrainOnceBlockPolicyHitsRejectMaxSizeThreshold() throws 
Exception {
+      AddressSettings addressSettings = 
server.getAddressSettingsRepository().getMatch("#");
+      
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+      addressSettings.setMaxSizeBytes(500);
+      addressSettings.setMaxSizeBytesRejectThreshold(1000);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+      server.start();
+
+      try (ProtonTestClient peer = new ProtonTestClient()) {
+         peer.queueClientSaslAnonymousConnect();
+         peer.connect("localhost", AMQP_PORT);
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         peer.expectOpen();
+         peer.expectBegin();
+         peer.expectAttach().ofReceiver();
+         peer.expectFlow().withLinkCredit(3);
+
+         peer.remoteOpen().withContainerId("test-sender").now();
+         peer.remoteBegin().now();
+         peer.remoteAttach().ofSender()
+                            .withInitialDeliveryCount(0)
+                            .withName("sending-peer")
+                            .withTarget().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withSource().also()
+                            .now();
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+         
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+                                                           
.setAddress(getTestName())
+                                                           
.setAutoCreated(false));
+
+         Wait.assertTrue(() -> 
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+
+         Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+
+         peer.expectDisposition().withState().accepted();
+
+         final String payload = "A".repeat(1024);
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+                              .withBody().withString("First Message: " + 
payload)
+                              .also()
+                              .withDeliveryId(1)
+                              .now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(1).withDrain(true);
+         peer.expectDisposition().withState().rejected();
+
+         peer.remoteTransfer().withHeader().withDurability(true).also()
+                              
.withApplicationProperties().withProperty("color", "red").also()
+                              
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+                              .withBody().withString("Second Message: ")
+                              .also()
+                              .withDeliveryId(2)
+                              .later(10);
+
+         // Make capacity again which should now replenish credit once remote 
sends drain response.
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofSender();
+         peer.expectDetach();
+         peer.remoteAttach().ofReceiver()
+                            .withName("receiving-peer")
+                            .withSource().withAddress(getTestName())
+                                         .withCapabilities("topic").also()
+                            .withTarget().also()
+                            .now();
+         peer.remoteDetach().later(1);
+
+         queue1.deleteQueue();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+         // Respond drained with one credit outstanding, remote should then 
grant a new batch
+         
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(3).withDrain(true).now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectClose();
+         peer.remoteClose().now();
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+      }
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index cb38db93ce..542f2c81e6 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -6319,6 +6319,12 @@ public class AMQPFederationAddressPolicyTest extends 
AmqpClientTestSupport {
 
          peer.expectDisposition().withState().accepted(); // This should fill 
the address
 
+         // In either configuration we drain the link credit to avoid repeated 
sends of the failed
+         // deliveries where possible.
+         peer.expectFlow().withDrain(true).withLinkCredit(998)
+                          .respond()
+                          
.withDrain(true).withLinkCredit(0).withDeliveryCount(998).afterDelay(5);
+
          // If sending modified the remote won't discard the message, it will 
send it again
          if (useModifiedForReject) {
             peer.expectDisposition().withState().modified(true);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index fa09db9be8..2c08c2a79c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -4990,6 +4990,108 @@ public class AMQPFederationQueuePolicyTest extends 
AmqpClientTestSupport {
       }
    }
 
+   @Test
+   @Timeout(20)
+   public void 
testFederationGrantsCreditOnNewReceiverAfterLinkQuiescedByMessagesConsumingDrainedCredit()
 throws Exception {
+      try (ProtonTestServer peer = new ProtonTestServer()) {
+         peer.expectSASLAnonymousConnect();
+         peer.expectOpen().respond();
+         peer.expectBegin().respond();
+         peer.expectAttach().ofSender()
+                            
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+                            .respondInKind();
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(10);
+         peer.start();
+
+         final URI remoteURI = peer.getServerURI();
+         logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+         final AMQPFederationQueuePolicyElement receiveFromQueue = new 
AMQPFederationQueuePolicyElement();
+         receiveFromQueue.setName("queue-policy");
+         receiveFromQueue.addToIncludes(getTestName(), getTestName());
+         receiveFromQueue.addProperty(RECEIVER_QUIESCE_TIMEOUT, 10_000);
+         receiveFromQueue.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 10_000);
+         receiveFromQueue.addProperty("amqpCredits", "3");
+         receiveFromQueue.addProperty("amqpCreditsLow", "0");
+
+         final AMQPFederatedBrokerConnectionElement element = new 
AMQPFederatedBrokerConnectionElement();
+         element.setName(getTestName());
+         element.addLocalQueuePolicy(receiveFromQueue);
+
+         final AMQPBrokerConnectConfiguration amqpConnection =
+            new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + 
remoteURI.getHost() + ":" + remoteURI.getPort());
+         amqpConnection.setReconnectAttempts(0);// No reconnects
+         amqpConnection.addElement(element);
+
+         server.getConfiguration().addAMQPConnection(amqpConnection);
+         server.start();
+         
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+                                                                
.setAddress(getTestName())
+                                                                
.setAutoCreated(false));
+
+         peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         peer.expectAttach().ofReceiver()
+                            
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+                            .withName(allOf(containsString(getTestName()),
+                                            containsString("queue-receiver"),
+                                            
containsString(server.getNodeID().toString())))
+                            .respondInKind();
+         peer.expectFlow().withLinkCredit(3);
+
+         final ConnectionFactory factory = 
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+         try (Connection connection = factory.createConnection()) {
+            final Session session = 
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            final MessageConsumer consumer = 
session.createConsumer(session.createQueue(getTestName()));
+
+            connection.start();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            // Demand is removed so expect a drain, respond to drain to 
quiesce the link
+            // which should leave it idling and ready for recovery by next 
consumer.
+            peer.expectFlow().withLinkCredit(3).withDrain(true);
+
+            // Burn off credit with messages instead of a flow with drained.
+            
peer.remoteTransfer().withBody().withString("test-message-1").also()
+                                 .withHeader().withDurability(true).also()
+                                 .withApplicationProperties().also()
+                                 .withMessageAnnotations().also()
+                                 .withDeliveryId(0)
+                                 .queue();
+            peer.expectDisposition().withState().accepted();
+            
peer.remoteTransfer().withBody().withString("test-message-2").also()
+                                 .withHeader().withDurability(true).also()
+                                 .withApplicationProperties().also()
+                                 .withMessageAnnotations().also()
+                                 .withDeliveryId(1)
+                                 .queue();
+            peer.expectDisposition().withState().accepted();
+            
peer.remoteTransfer().withBody().withString("test-message-3").also()
+                                 .withHeader().withDurability(true).also()
+                                 .withApplicationProperties().also()
+                                 .withMessageAnnotations().also()
+                                 .withDeliveryId(2)
+                                 .queue();
+            peer.expectDisposition().withState().accepted();
+
+            consumer.close();
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+            peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+            // New demand added before any quiesce or idle timeout can trigger
+            session.createConsumer(session.createQueue(getTestName()));
+
+            peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+         }
+
+         peer.close();
+      }
+   }
+
    private static void sendQueueAddedEvent(ProtonTestPeer peer, String 
address, String queue, int handle, int deliveryId) {
       final Map<String, Object> eventMap = new LinkedHashMap<>();
       eventMap.put(REQUESTED_ADDRESS_NAME, address);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to