This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new f30029f04a ARTEMIS-5645 Drain the receiver link if address is
rejecting messages
f30029f04a is described below
commit f30029f04a87202ac8e928b425080eb501446686
Author: Timothy Bish <[email protected]>
AuthorDate: Wed Sep 3 10:17:09 2025 -0400
ARTEMIS-5645 Drain the receiver link if address is rejecting messages
When an address policy limits the incoming messages and starts to reject
them
the AMQP link should drain remote credit to limit the remote from sending
new
messages until the address has space again in which case new credit will be
granted.
---
.../amqp/broker/ProtonProtocolManager.java | 47 +-
.../artemis/protocol/amqp/proton/AmqpSupport.java | 6 +
.../amqp/proton/ProtonAbstractReceiver.java | 188 +++-
.../amqp/proton/ProtonServerReceiverContext.java | 74 --
.../proton/ProtonServerReceiverContextTest.java | 121 ++-
.../amqp/AmqpDrainOnNoSpaceForSendsTest.java | 1091 ++++++++++++++++++++
.../connect/AMQPFederationAddressPolicyTest.java | 6 +
.../connect/AMQPFederationQueuePolicyTest.java | 102 ++
8 files changed, 1535 insertions(+), 100 deletions(-)
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index af26b5f8a5..b5f682a717 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -96,6 +96,8 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
private boolean amqpUseModifiedForTransientDeliveryErrors =
AmqpSupport.AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS;
+ private boolean amqpDrainOnTransientDeliveryErrors =
AmqpSupport.AMQP_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS;
+
// If set true, a reject disposition will be treated as if it were an
unmodified disposition with the
// delivery-failed flag set true.
private boolean amqpTreatRejectAsUnmodifiedDeliveryFailed =
AmqpSupport.AMQP_TREAT_REJECT_AS_UNMODIFIED_DELIVERY_FAILURE;
@@ -106,6 +108,8 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
private String saslLoginConfigScope = "amqp-sasl-gssapi";
+ private int amqpLinkQuiesceTimeout = AmqpSupport.AMQP_LINK_QUIESCE_TIMEOUT;
+
private Long amqpIdleTimeout;
private long ackManagerFlushTimeout = 10_000;
@@ -449,7 +453,6 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
return this;
}
-
public void setAmqpTreatRejectAsUnmodifiedDeliveryFailed(final boolean
amqpTreatRejectAsUnmodifiedDeliveryFailed) {
this.amqpTreatRejectAsUnmodifiedDeliveryFailed =
amqpTreatRejectAsUnmodifiedDeliveryFailed;
}
@@ -457,4 +460,46 @@ public class ProtonProtocolManager extends
AbstractProtocolManager<AMQPMessage,
public boolean isAmqpTreatRejectAsUnmodifiedDeliveryFailed() {
return this.amqpTreatRejectAsUnmodifiedDeliveryFailed;
}
+
+ /**
+ * {@return true if transient delivery errors should be handled by draining
link credit from the remote sender}
+ */
+ public boolean isDrainOnTransientDeliveryErrors() {
+ return this.amqpDrainOnTransientDeliveryErrors;
+ }
+
+ /**
+ * Sets if transient delivery errors should be handled by draining link
credit from the remote sender
+ *
+ * @param amqpDrainOnTransientDeliveryErrors
+ * Set to <code>true</code> if senders should be drained on transient
delivery errors.
+ *
+ * @return this protocol manager instance.
+ */
+ public ProtonProtocolManager setAmqpDrainOnTransientDeliveryErrors(boolean
amqpDrainOnTransientDeliveryErrors) {
+ this.amqpDrainOnTransientDeliveryErrors =
amqpDrainOnTransientDeliveryErrors;
+ return this;
+ }
+
+ /**
+ * {@return the time in milliseconds to wait for remote sender once a link
quiesce is initiated by this peer}
+ */
+ public int getLinkQuiesceTimeout() {
+ return amqpLinkQuiesceTimeout;
+ }
+
+ /**
+ * Sets the time in milliseconds to wait before closing a remote sender
link if the server has requested
+ * that the link drain all outstanding credit an complete pending
settlements. A value less than or equal
+ * to zero disables drain timeouts.
+ *
+ * @param amqpLinkQuiesceTimeout
+ * The time in milliseconds to wait for quiesce to complete.
+ *
+ * @return this protocol manager instance.
+ */
+ public ProtonProtocolManager setAmqpLinkQuiesceTimeout(int
amqpLinkQuiesceTimeout) {
+ this.amqpLinkQuiesceTimeout = amqpLinkQuiesceTimeout;
+ return this;
+ }
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
index f5ce4cc1a8..c0eab92fb9 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java
@@ -49,6 +49,12 @@ public class AmqpSupport {
// Defaults for controlling the behaviour of AMQP dispositions
public static final boolean AMQP_USE_MODIFIED_FOR_TRANSIENT_DELIVERY_ERRORS
= false;
+ // Defaults for controlling the behaviour of AMQP credit draining on
resource exhaustion
+ public static final boolean AMQP_DRAIN_ON_TRANSIENT_DELIVERY_ERRORS = true;
+
+ // Default timeout (in milliseconds) before closing a link as failed if a
quiesce is initiated from this peer
+ public static final int AMQP_LINK_QUIESCE_TIMEOUT = 600_000; // ten minutes
+
// Identification values used to locating JMS selector types.
public static final Symbol JMS_SELECTOR_KEY =
Symbol.valueOf("jms-selector");
public static final UnsignedLong JMS_SELECTOR_CODE =
UnsignedLong.valueOf(0x0000468C00000004L);
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
index 104350c94c..71cea48afc 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonAbstractReceiver.java
@@ -20,11 +20,15 @@ import static
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessa
import static
org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@@ -35,10 +39,17 @@ import
org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Modified;
+import org.apache.qpid.proton.amqp.messaging.Outcome;
+import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transaction.TransactionalState;
import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.amqp.transport.LinkError;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
@@ -50,6 +61,7 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
protected enum ReceiverState {
STARTED,
+ DRAINING,
STOPPING,
STOPPED,
CLOSED
@@ -67,13 +79,15 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
protected final MessageReader largeMessageReader = new
AMQPLargeMessageReader(this);
protected final Runnable creditRunnable;
protected final boolean useModified;
+ protected final boolean drainCreditOnNoSpace;
+ protected final long drainTimeout;
protected final Runnable creditTopUpRunner = this::doCreditTopUpRun;
protected volatile MessageReader messageReader;
protected int pendingSettles = 0;
protected volatile ReceiverState state = ReceiverState.STARTED;
protected BiConsumer<ProtonAbstractReceiver, Boolean> pendingStop;
- protected ScheduledFuture<?> pendingStopTimeout;
+ protected ScheduledFuture<?> pendingQuiesceTimeout;
// Not always used so left unallocated until needed, on attach the
capabilities drive if supported
protected MessageReader coreMessageReader;
@@ -90,7 +104,9 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
this.receiver = receiver;
this.minLargeMessageSize = getConfiguredMinLargeMessageSize(connection);
this.creditRunnable = createCreditRunnable(connection);
- this.useModified =
this.connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
+ this.useModified =
connection.getProtocolManager().isUseModifiedForTransientDeliveryErrors();
+ this.drainCreditOnNoSpace =
connection.getProtocolManager().isDrainOnTransientDeliveryErrors();
+ this.drainTimeout =
connection.getProtocolManager().getLinkQuiesceTimeout();
this.routingContext = new
RoutingContextImpl(null).setDuplicateDetection(connection.getProtocolManager().isAmqpDuplicateDetection());
}
@@ -128,25 +144,38 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
* an asynchronous call invalid. The stop call allows a timeout to be
specified which will signal the stopped
* consumer if the timeout elapses and leaves the receiver in the stopping
state which does not allow for a restart.
*
- * @param stopTimeout A time in milliseconds to wait for the stop to
complete before considering it as having
- * failed.
- * @param onStopped A consumer that is signaled once the receiver has
stopped or the timeout elapsed.
+ * @param stopTimeout
+ * A time in milliseconds to wait for the stop to complete before
considering it as having failed.
+ * @param onStopped
+ * A consumer that is signaled once the receiver has stopped or the
timeout elapsed.
+ *
* @throws IllegalStateException if the receiver is currently in the
stopping state.
*/
public void stop(int stopTimeout, BiConsumer<ProtonAbstractReceiver,
Boolean> onStopped) {
Objects.requireNonNull(onStopped, "The stopped callback must not be
null");
connection.requireInHandler();
- if (isStarted()) {
+ if (state.ordinal() < ReceiverState.STOPPING.ordinal()) {
state = ReceiverState.STOPPING;
pendingStop = onStopped;
- if (!checkIfPendingStopCanComplete()) {
- if (receiver.getCredit() != 0) {
+
+ if (!tryCompletePendingQuiesce()) {
+ // The receiver could already be draining but if not we trigger a
drain to handle
+ // the run off of pending messages from the remote before
reporting stopped.
+ if (receiver.getCredit() != 0 && !receiver.draining()) {
receiver.drain(0);
}
if (stopTimeout > 0) {
- pendingStopTimeout =
protonSession.getServer().getScheduledPool().schedule(() -> {
+ // Take over any pending timeout setup from a drain operation
with one with the given
+ // stop timeout otherwise the pending drain timeout will kick
in and will signal the
+ // stop callback indicating that stop failed.
+ if (pendingQuiesceTimeout != null) {
+ pendingQuiesceTimeout.cancel(false);
+ pendingQuiesceTimeout = null;
+ }
+
+ pendingQuiesceTimeout =
protonSession.getServer().getScheduledPool().schedule(() -> {
connection.runNow(() -> signalStoppedCallback(false));
}, stopTimeout, TimeUnit.MILLISECONDS);
}
@@ -183,8 +212,8 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
return state == ReceiverState.STARTED;
}
- public boolean isBusy() {
- return false;
+ public boolean isDraining() {
+ return state == ReceiverState.DRAINING;
}
public boolean isStopping() {
@@ -199,6 +228,10 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
return state == ReceiverState.CLOSED;
}
+ public boolean isBusy() {
+ return false;
+ }
+
/**
* Set the proper operation context in the Thread Local.
* Return the old context*/
@@ -245,6 +278,7 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
* @param receiver The proton receiver that will have its credit refilled
* @param connection The connection that own the receiver
* @param context The context that will be associated with the receiver
+ *
* @return A new Runnable that can be used to keep receiver credit
replenished
*/
public static Runnable createCreditRunnable(int refill,
@@ -320,16 +354,27 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
if (isStarted()) {
topUpCreditIfNeeded();
} else {
- checkIfPendingStopCanComplete();
+ tryCompletePendingQuiesce();
}
}
- private boolean checkIfPendingStopCanComplete() {
- if (isStopping() && pendingSettles == 0 && receiver.getQueued() == 0 &&
receiver.getCredit() == 0) {
- state = ReceiverState.STOPPED;
- signalStoppedCallback(true);
+ private boolean tryCompletePendingQuiesce() {
+ if (pendingSettles == 0 && receiver.getQueued() == 0 &&
receiver.getCredit() == 0) {
+ if (isStopping()) {
+ state = ReceiverState.STOPPED;
+ signalStoppedCallback(true);
+ return true;
+ } else if (isDraining()) {
+ state = ReceiverState.STARTED;
+
+ if (pendingQuiesceTimeout != null) {
+ pendingQuiesceTimeout.cancel(false);
+ pendingQuiesceTimeout = null;
+ }
- return true;
+ topUpCreditIfNeeded();
+ return true;
+ }
}
return false;
@@ -337,10 +382,10 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
@Override
public void onFlow(int credits, boolean drain) {
- if (isStopping()) {
- checkIfPendingStopCanComplete();
- } else {
+ if (isStarted()) {
topUpCreditIfNeeded();
+ } else {
+ tryCompletePendingQuiesce();
}
}
@@ -359,7 +404,7 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
if (!receiver.getDrain() && isStarted()) {
receiver.flow(1);
} else {
- checkIfPendingStopCanComplete();
+ tryCompletePendingQuiesce();
}
}
@@ -477,6 +522,47 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
});
}
+ public void deliveryFailed(Delivery delivery, Receiver receiver, Exception
e) {
+ connection.runNow(() -> {
+ if (drainCreditOnNoSpace && isStarted() && receiver.getCredit() > 0
&& isAddressFull(e)) {
+ // Only when started do we want to drain off credit when the
address is full since we
+ // will have already reacted to a stop or close. Once drained this
will return to the
+ // started state if not stopped or closed since.
+ state = ReceiverState.DRAINING;
+ receiver.drain(0);
+
+ if (drainTimeout > 0) {
+ pendingQuiesceTimeout =
protonSession.getServer().getScheduledPool().schedule(() -> {
+ final ErrorCondition error = new
ErrorCondition(LinkError.DETACH_FORCED, "Timed out waiting for remote sender to
drain");
+
+ connection.runLater(() -> {
+ try {
+ // Accounts for possible swap to stopping where the
caller did not provide their
+ // own timeout so the stop will leave the drain
timeout in place if one was active.
+ if (isStopping()) {
+ pendingStop.accept(this, false);
+ } else {
+ try {
+ close(error);
+ } finally {
+ receiver.close();
+ }
+ }
+ } catch (ActiveMQAMQPException ex) {
+ logger.debug("Error while attempting to close receiver
that did not drain or stop in time", ex);
+ } finally {
+ connection.flush();
+ }
+ });
+ }, drainTimeout, TimeUnit.MILLISECONDS);
+ }
+ }
+ delivery.disposition(determineDeliveryState(((Source)
receiver.getSource()), useModified, e));
+ settle(delivery);
+ connection.flush();
+ });
+ }
+
/**
* {@return either the fixed address assigned to this sender, or the last
address used by an anonymous relay sender;
* if this is an anonymous relay and no send has occurred then this method
returns {@code null}}
@@ -509,9 +595,9 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
}
private void signalStoppedCallback(boolean stopped) {
- if (pendingStopTimeout != null) {
- pendingStopTimeout.cancel(false);
- pendingStopTimeout = null;
+ if (pendingQuiesceTimeout != null) {
+ pendingQuiesceTimeout.cancel(false);
+ pendingQuiesceTimeout = null;
}
if (pendingStop != null) {
@@ -556,6 +642,60 @@ public abstract class ProtonAbstractReceiver extends
ProtonInitializable impleme
}
}
+ protected static boolean isAddressFull(final Exception e) {
+ return e instanceof ActiveMQException amqe &&
ActiveMQExceptionType.ADDRESS_FULL.equals(amqe.getType());
+ }
+
+ protected static boolean outcomeSupported(final Source source, final Symbol
outcome) {
+ if (source != null && source.getOutcomes() != null) {
+ return Arrays.asList((source).getOutcomes()).contains(outcome);
+ }
+ return false;
+ }
+
+ protected static Outcome getEffectiveDefaultOutcome(final Source source) {
+ return (source.getOutcomes() == null || source.getOutcomes().length ==
0) ? source.getDefaultOutcome() : null;
+ }
+
+ private static DeliveryState determineDeliveryState(final Source source,
final boolean useModified, final Exception e) {
+ Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
+
+ if (isAddressFull(e) && useModified && (outcomeSupported(source,
Modified.DESCRIPTOR_SYMBOL) || defaultOutcome instanceof Modified)) {
+ Modified modified = new Modified();
+ modified.setDeliveryFailed(true);
+ return modified;
+ } else {
+ if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) ||
defaultOutcome instanceof Rejected) {
+ return createRejected(e);
+ } else if (source.getDefaultOutcome() instanceof DeliveryState) {
+ return ((DeliveryState) source.getDefaultOutcome());
+ } else {
+ // The AMQP specification requires that Accepted is returned for
this case. However there exist
+ // implementations that set neither outcomes/default-outcome but
use/expect for full range of outcomes.
+ // To maintain compatibility with these implementations, we
maintain previous behaviour.
+ return createRejected(e);
+ }
+ }
+ }
+
+ private static Rejected createRejected(final Exception e) {
+ ErrorCondition condition = new ErrorCondition();
+
+ if (e instanceof ActiveMQSecurityException) {
+ condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
+ } else if (isAddressFull(e)) {
+ condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
+ } else {
+ condition.setCondition(Symbol.valueOf("failed"));
+ }
+ condition.setDescription(e.getMessage());
+
+ Rejected rejected = new Rejected();
+ rejected.setError(condition);
+
+ return rejected;
+ }
+
public static boolean isBellowThreshold(int credit, int pending, int
threshold) {
return credit <= threshold - pending;
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index 2ef953d3b3..dae8fe135c 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -18,13 +18,9 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.List;
import java.util.Objects;
-
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -43,14 +39,7 @@ import
org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMess
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
-import org.apache.qpid.proton.amqp.messaging.Modified;
-import org.apache.qpid.proton.amqp.messaging.Outcome;
-import org.apache.qpid.proton.amqp.messaging.Rejected;
-import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.DeliveryState;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Receiver;
@@ -235,14 +224,6 @@ public class ProtonServerReceiverContext extends
ProtonAbstractReceiver {
}
}
- public void deliveryFailed(Delivery delivery, Receiver receiver, Exception
e) {
- connection.runNow(() -> {
- delivery.disposition(determineDeliveryState(((Source)
receiver.getSource()), useModified, e));
- settle(delivery);
- connection.flush();
- });
- }
-
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
@@ -266,28 +247,6 @@ public class ProtonServerReceiverContext extends
ProtonAbstractReceiver {
return
Objects.requireNonNullElse(sessionSPI.getRoutingTypeFromPrefix(address,
sessionSPI.getDefaultRoutingType(address)),
ActiveMQDefaultConfiguration.getDefaultRoutingType());
}
- private static DeliveryState determineDeliveryState(final Source source,
final boolean useModified, final Exception e) {
- Outcome defaultOutcome = getEffectiveDefaultOutcome(source);
-
- if (isAddressFull(e) && useModified &&
- (outcomeSupported(source, Modified.DESCRIPTOR_SYMBOL) ||
defaultOutcome instanceof Modified)) {
- Modified modified = new Modified();
- modified.setDeliveryFailed(true);
- return modified;
- } else {
- if (outcomeSupported(source, Rejected.DESCRIPTOR_SYMBOL) ||
defaultOutcome instanceof Rejected) {
- return createRejected(e);
- } else if (source.getDefaultOutcome() instanceof DeliveryState) {
- return ((DeliveryState) source.getDefaultOutcome());
- } else {
- // The AMQP specification requires that Accepted is returned for
this case. However there exist
- // implementations that set neither outcomes/default-outcome but
use/expect for full range of outcomes.
- // To maintain compatibility with these implementations, we
maintain previous behaviour.
- return createRejected(e);
- }
- }
- }
-
private static RoutingType getExplicitRoutingType(Symbol[] symbols) {
if (symbols != null) {
for (Symbol symbol : symbols) {
@@ -300,37 +259,4 @@ public class ProtonServerReceiverContext extends
ProtonAbstractReceiver {
}
return null;
}
-
- private static Rejected createRejected(final Exception e) {
- ErrorCondition condition = new ErrorCondition();
-
- if (e instanceof ActiveMQSecurityException) {
- condition.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
- } else if (isAddressFull(e)) {
- condition.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
- } else {
- condition.setCondition(Symbol.valueOf("failed"));
- }
- condition.setDescription(e.getMessage());
-
- Rejected rejected = new Rejected();
- rejected.setError(condition);
-
- return rejected;
- }
-
- private static boolean isAddressFull(final Exception e) {
- return e instanceof ActiveMQException amqe &&
ActiveMQExceptionType.ADDRESS_FULL.equals(amqe.getType());
- }
-
- private static boolean outcomeSupported(final Source source, final Symbol
outcome) {
- if (source != null && source.getOutcomes() != null) {
- return Arrays.asList((source).getOutcomes()).contains(outcome);
- }
- return false;
- }
-
- private static Outcome getEffectiveDefaultOutcome(final Source source) {
- return (source.getOutcomes() == null || source.getOutcomes().length ==
0) ? source.getDefaultOutcome() : null;
- }
}
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
index b965c4a50e..e6cc880044 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContextTest.java
@@ -17,6 +17,9 @@
package org.apache.activemq.artemis.protocol.amqp.proton;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.Unpooled;
@@ -24,6 +27,7 @@ import
org.apache.activemq.artemis.api.core.ActiveMQAddressFullException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import
org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
@@ -34,11 +38,13 @@ import
org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport;
import
org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.util.NettyReadable;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
+import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Outcome;
import org.apache.qpid.proton.amqp.messaging.Rejected;
+import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.codec.ReadableBuffer;
@@ -179,7 +185,10 @@ public class ProtonServerReceiverContextTest {
verify(mockDelivery, times(1)).settle();
verify(mockReceiver, times(1)).getDrain();
- if (!drain) {
+ if (drain) {
+ verify(mockReceiver, times(1)).getQueued();
+ verify(mockReceiver, times(1)).getCredit();
+ } else {
verify(mockReceiver, times(1)).flow(1);
}
verifyNoMoreInteractions(mockReceiver);
@@ -243,6 +252,116 @@ public class ProtonServerReceiverContextTest {
assertEquals(900,
ProtonServerReceiverContext.calculatedUpdateRefill(2000, 1000, 100));
}
+ @Test
+ public void testStopDrainsOffCredit() throws Exception {
+ final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
+
+ try {
+ ActiveMQServer mockServer = mock(ActiveMQServer.class);
+ when(mockServer.getScheduledPool()).thenReturn(scheduler);
+
+ Receiver mockReceiver = mock(Receiver.class);
+ AMQPConnectionContext mockConnContext =
mock(AMQPConnectionContext.class);
+
+ when(mockConnContext.getAmqpCredits()).thenReturn(100);
+ when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
+
+
when(mockConnContext.getProtocolManager()).thenReturn(mock(ProtonProtocolManager.class));
+
+ AMQPSessionCallback mockSessionSpi = mock(AMQPSessionCallback.class);
+ when(mockSessionSpi.getStorageManager()).thenReturn(new
NullStorageManager());
+ when(mockSessionSpi.createStandardMessage(any(),
any())).thenAnswer((Answer<AMQPStandardMessage>) invocation -> new
AMQPStandardMessage(0, createAMQPMessageBuffer(), null, null));
+
+ AMQPSessionContext mockProtonContext = mock(AMQPSessionContext.class);
+ when(mockProtonContext.getServer()).thenReturn(mockServer);
+ when(mockProtonContext.getSessionSPI()).thenReturn(mockSessionSpi);
+
+ ProtonServerReceiverContext rc = new
ProtonServerReceiverContext(mockSessionSpi, mockConnContext, mockProtonContext,
mockReceiver);
+
+ AtomicBoolean stopped = new AtomicBoolean();
+
+ when(mockReceiver.getCredit()).thenReturn(100);
+
+ rc.stop(10_000, (rcvr, didStop) -> stopped.set(true));
+
+ assertFalse(stopped.get());
+
+ verify(mockReceiver, times(1)).drain(0);
+ verify(mockReceiver, times(1)).draining();
+ verify(mockReceiver, times(2)).getCredit();
+ verify(mockReceiver, times(1)).getQueued();
+ verify(mockReceiver, times(1)).drain(0);
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ @Test
+ public void testDrainInitiatedWhenDeliveryMarkedFailed() throws Exception {
+ final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(1);
+
+ try {
+ ActiveMQServer mockServer = mock(ActiveMQServer.class);
+ when(mockServer.getScheduledPool()).thenReturn(scheduler);
+
+ Source source = new Source();
+ source.setOutcomes(new Symbol[]{Accepted.DESCRIPTOR_SYMBOL,
Rejected.DESCRIPTOR_SYMBOL,
+ Modified.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL});
+
+ Receiver mockReceiver = mock(Receiver.class);
+ when(mockReceiver.getSource()).thenReturn(source);
+
+ ProtonProtocolManager mockProtocolManager =
mock(ProtonProtocolManager.class);
+ when(mockProtocolManager.getLinkQuiesceTimeout()).thenReturn(100);
+
when(mockProtocolManager.isDrainOnTransientDeliveryErrors()).thenReturn(true);
+
+ AMQPConnectionContext mockConnContext =
mock(AMQPConnectionContext.class);
+ when(mockConnContext.getAmqpCredits()).thenReturn(100);
+ when(mockConnContext.getAmqpLowCredits()).thenReturn(30);
+
when(mockConnContext.getProtocolManager()).thenReturn(mockProtocolManager);
+
+ doAnswer(invocation -> {
+ final Runnable target = invocation.getArgument(0, Runnable.class);
+ target.run();
+ return null;
+ }).when(mockConnContext).runNow(any(Runnable.class));
+
+ doAnswer(invocation -> {
+ final Runnable target = invocation.getArgument(0, Runnable.class);
+ target.run();
+ return null;
+ }).when(mockConnContext).runLater(any(Runnable.class));
+
+ AMQPSessionCallback mockSessionSpi = mock(AMQPSessionCallback.class);
+ when(mockSessionSpi.getStorageManager()).thenReturn(new
NullStorageManager());
+ when(mockSessionSpi.createStandardMessage(any(),
any())).thenAnswer((Answer<AMQPStandardMessage>) invocation -> new
AMQPStandardMessage(0, createAMQPMessageBuffer(), null, null));
+
+ AMQPSessionContext mockProtonContext = mock(AMQPSessionContext.class);
+ when(mockProtonContext.getServer()).thenReturn(mockServer);
+ when(mockProtonContext.getSessionSPI()).thenReturn(mockSessionSpi);
+
+ ProtonServerReceiverContext rc = new
ProtonServerReceiverContext(mockSessionSpi, mockConnContext, mockProtonContext,
mockReceiver);
+
+ when(mockReceiver.getCredit()).thenReturn(100);
+
+ Delivery mockDelivery = mock(Delivery.class);
+ when(mockDelivery.getLink()).thenReturn(mockReceiver);
+ when(mockReceiver.current()).thenReturn(mockDelivery);
+
+ rc.incrementSettle();
+ rc.deliveryFailed(mockDelivery, mockReceiver, new
ActiveMQAddressFullException("full"));
+
+ Wait.assertTrue(() -> rc.isClosed(), 5000, 50);
+
+ verify(mockReceiver, times(1)).drain(0);
+ verify(mockReceiver, times(2)).getCredit();
+ verify(mockReceiver, times(1)).drain(0);
+ verify(mockReceiver, times(1)).getSource();
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
private ReadableBuffer createAMQPMessageBuffer() {
MessageImpl message = (MessageImpl) Message.Factory.create();
message.setContentType(AMQPMessageSupport.OCTET_STREAM_CONTENT_TYPE);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDrainOnNoSpaceForSendsTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDrainOnNoSpaceForSendsTest.java
new file mode 100644
index 0000000000..9bd9a47a11
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDrainOnNoSpaceForSendsTest.java
@@ -0,0 +1,1091 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.qpid.proton.amqp.transport.LinkError;
+import org.apache.qpid.protonj2.test.driver.ProtonTestClient;
+import org.junit.jupiter.api.Test;
+
+public class AmqpDrainOnNoSpaceForSendsTest extends AmqpClientTestSupport {
+
+ @Override
+ protected String getConfiguredProtocols() {
+ return "AMQP";
+ }
+
+ @Override
+ protected void configureAMQPAcceptorParameters(TransportConfiguration tc) {
+ tc.getParams().put("amqpCredits", "3");
+ tc.getParams().put("amqpLowCredits", "1");
+ }
+
+ @Override
+ protected void configureAddressPolicy(ActiveMQServer server) {
+ AddressSettings addressSettings =
server.getAddressSettingsRepository().getMatch("#");
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings.setMaxSizeBytes(500);
+ server.getAddressSettingsRepository().addMatch("#", addressSettings);
+ }
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ // Creates the broker used to make the outgoing connection. The port
passed is for
+ // that brokers acceptor. The test server connected to by the broker
binds to a random port.
+ return createServer(AMQP_PORT, false);
+ }
+
+ @Test
+ public void testDrainCreditOnClientSendFailForMulticastAddress() throws
Exception {
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender").now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().rejected();
+ peer.expectDetach();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+ .withBody().withString("Third Message: ")
+ .also()
+ .withDeliveryId(3)
+ .later(10);
+ peer.remoteDetach().later(15);
+
+ // Should be no new flow since the address remains full
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testDrainCreditOnClientSendFailForAnyCastAddress() throws
Exception {
+ server.start();
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(getTestName())).isExists(), 5000, 100);
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender").now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().withAddress(getTestName())
+ .withCapabilities("queue").also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().rejected();
+ peer.expectDetach();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+ .withBody().withString("Third Message: ")
+ .also()
+ .withDeliveryId(3)
+ .later(10);
+ peer.remoteDetach().later(15);
+
+ // Should be no new flow since the address remains full
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void
testDoesDrainCreditOnClientSendFailForAnonymousRelaySenderForMatchedAddressPolicy()
throws Exception {
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender")
+
.withDesiredCapabilities(AmqpSupport.ANONYMOUS_RELAY.toString()).now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+ final Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+ final Queue queue2 = server.locateQueue(SimpleString.of("queue2"));
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+ .withBody().withString("Third Message: ")
+ .also()
+ .withDeliveryId(3)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(3);
+
+ // Make capacity again which should now replenish credit.
+ queue1.deleteQueue();
+ queue2.deleteQueue();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().accepted();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "4").also()
+ .withBody().withString("Fourth Message: ")
+ .also()
+ .withDeliveryId(4)
+ .later(10);
+
+ // Should be no new flow since the address remains full
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void
testDrainedAnonymousRelaySenderCanBurnCreditSendingToOtherAddresses() throws
Exception {
+ final String queueNameA = getTestName() + "-A";
+ final String queueNameB = getTestName() + "-B";
+
+ AddressSettings addressSettings1 =
server.getAddressSettingsRepository().getMatch(queueNameA);
+
addressSettings1.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings1.setMaxSizeBytes(500);
+ server.getAddressSettingsRepository().addMatch(queueNameA,
addressSettings1);
+
+ AddressSettings addressSettings2 =
server.getAddressSettingsRepository().getMatch(queueNameB);
+
addressSettings2.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings2.setMaxSizeBytes(500);
+ server.getAddressSettingsRepository().addMatch(queueNameB,
addressSettings2);
+
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender")
+
.withDesiredCapabilities(AmqpSupport.ANONYMOUS_RELAY.toString()).now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of(queueNameA).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(queueNameA)
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of(queueNameB).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(queueNameB)
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(queueNameA)).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(queueNameB)).isExists(), 5000, 100);
+
+ final Queue queue1 = server.locateQueue(SimpleString.of(queueNameA));
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameA).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameA).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().accepted();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameB).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+ .withBody().withString("Third Message: ")
+ .also()
+ .withDeliveryId(3)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(3);
+
+ // Make capacity again which should now replenish credit.
+ queue1.deleteQueue();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().accepted();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameA).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "4").also()
+ .withBody().withString("Fourth Message: ")
+ .also()
+ .withDeliveryId(4)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void
testDrainedAnonymousRelaySenderCreditNotReplenishedOnBurnOfExistingCredit()
throws Exception {
+ final String queueNameA = getTestName() + "-A";
+ final String queueNameB = getTestName() + "-B";
+
+ AddressSettings addressSettings1 =
server.getAddressSettingsRepository().getMatch(queueNameA);
+
addressSettings1.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings1.setMaxSizeBytes(500);
+ server.getAddressSettingsRepository().addMatch(queueNameA,
addressSettings1);
+
+ AddressSettings addressSettings2 =
server.getAddressSettingsRepository().getMatch(queueNameB);
+
addressSettings2.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings2.setMaxSizeBytes(500);
+ server.getAddressSettingsRepository().addMatch(queueNameB,
addressSettings2);
+
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
peer.expectOpen().withOfferedCapability(AmqpSupport.ANONYMOUS_RELAY.toString());
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender")
+
.withDesiredCapabilities(AmqpSupport.ANONYMOUS_RELAY.toString()).now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of(queueNameA).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(queueNameA)
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of(queueNameB).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(queueNameB)
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(queueNameA)).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(queueNameB)).isExists(), 5000, 100);
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameA).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameA).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().accepted();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameB).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+ .withBody().withString("Third Message: ")
+ .also()
+ .withDeliveryId(3)
+ .later(10);
+
+ // Create some traffic to ensure we don't see unexpected flow frames
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().respond();
+ peer.remoteAttach().ofReceiver()
+ .withName("receiving-peer")
+ .withSource().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withTarget().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach().respond();
+ peer.remoteDetach().later(1);
+
+ // Should be no new flow since the address remains full
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void
testDrainAnonymousRelayOnceBlockPolicyHitsRejectMaxSizeThreshold() throws
Exception {
+ final String queueNameA = getTestName() + "A";
+ final String queueNameB = getTestName() + "B";
+
+ AddressSettings addressSettings1 =
server.getAddressSettingsRepository().getMatch(queueNameA);
+
addressSettings1.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ addressSettings1.setMaxSizeBytes(500);
+ addressSettings1.setMaxSizeBytesRejectThreshold(1000);
+ server.getAddressSettingsRepository().addMatch(queueNameA,
addressSettings1);
+
+ AddressSettings addressSettings2 =
server.getAddressSettingsRepository().getMatch(queueNameB);
+
addressSettings2.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ addressSettings2.setMaxSizeBytes(500);
+ addressSettings2.setMaxSizeBytesRejectThreshold(1000);
+ server.getAddressSettingsRepository().addMatch(queueNameB,
addressSettings2);
+
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
peer.expectOpen().withOfferedCapability(AmqpSupport.ANONYMOUS_RELAY.toString());
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender")
+
.withDesiredCapabilities(AmqpSupport.ANONYMOUS_RELAY.toString()).now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of(queueNameA).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(queueNameA)
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of(queueNameB).setRoutingType(RoutingType.MULTICAST)
+
.setAddress(queueNameB)
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(queueNameA)).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of(queueNameB)).isExists(), 5000, 100);
+
+ Queue queue1 = server.locateQueue(SimpleString.of(queueNameA));
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(1024);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameA).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(queueNameA).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: " +
payload)
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ // Make capacity again which should now replenish credit once remote
send drain response.
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender();
+ peer.expectDetach();
+ peer.remoteAttach().ofReceiver()
+ .withName("receiving-peer")
+ .withSource().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withTarget().also()
+ .now();
+ peer.remoteDetach().later(1);
+
+ queue1.deleteQueue();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+ // Respond drained with one credit outstanding, remote should then
grant a new batch
+
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(3).withDrain(true).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void
testDoesNotDrainCreditOnClientSendFailIfAcceptorConfiguredNotTo() throws
Exception {
+ server.getConfiguration().getAcceptorConfigurations().clear();
+ server.getConfiguration().addAcceptorConfiguration("server",
+ "tcp://localhost:" + AMQP_PORT +
"?amqpCredits=3&amqpLowCredits=0&amqpDrainOnTransientDeliveryErrors=false");
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender").now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+ final Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+ final Queue queue2 = server.locateQueue(SimpleString.of("queue2"));
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "3").also()
+ .withBody().withString("Third Message: ")
+ .also()
+ .withDeliveryId(3)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(3);
+
+ // Make capacity again which should now replenish credit.
+ queue1.deleteQueue();
+ queue2.deleteQueue();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDisposition().withState().accepted();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "4").also()
+ .withBody().withString("Fourth Message: ")
+ .also()
+ .withDeliveryId(4)
+ .later(10);
+
+ // Should be no new flow since the address remains full
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void
testDrainCreditOnClientSendFailPeerAnsweresDrainedButFlowsOnlyAfterSpaceCleared()
throws Exception {
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender").now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+ final Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+ final Queue queue2 = server.locateQueue(SimpleString.of("queue2"));
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(3).withDrain(true).now();
+
+ // The address should still be full so nothing expected until the
queues are deleted
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+ // Make capacity again which should now replenish credit.
+ queue1.deleteQueue();
+ queue2.deleteQueue();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void
testDrainCreditOnClientSendFailNoNewCreditUntilPeerAnswersDrained() throws
Exception {
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender").now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+
+ Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ // Make capacity again which should now replenish credit once remote
send drain response.
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ queue1.deleteQueue();
+
+ peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+ // Respond drained with one credit outstanding, remote should then
grant a new batch
+
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(3).withDrain(true).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testSenderClosedIfRemoteTimesOutWaitingForDrainToComplete()
throws Exception {
+ server.getConfiguration().getAcceptorConfigurations().clear();
+ server.getConfiguration().addAcceptorConfiguration("server",
+ "tcp://localhost:" + AMQP_PORT +
"?amqpCredits=3&amqpLowCredits=0&amqpLinkQuiesceTimeout=1");
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender").now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
server.createQueue(QueueConfiguration.of("queue2").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue2")).isExists(), 5000, 100);
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(100);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+ peer.expectDetach().withError(LinkError.DETACH_FORCED.toString());
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+ .withProperties().withTo(getTestName()).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ // Create some traffic to ensure we don't see unexpected flow frames
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender().respond();
+ peer.remoteAttach().ofReceiver()
+ .withName("receiving-peer")
+ .withSource().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withTarget().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectDetach().respond();
+ peer.remoteDetach().later(1);
+
+ // Should be no new flow since the address remains full
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+
+ @Test
+ public void testDrainOnceBlockPolicyHitsRejectMaxSizeThreshold() throws
Exception {
+ AddressSettings addressSettings =
server.getAddressSettingsRepository().getMatch("#");
+
addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ addressSettings.setMaxSizeBytes(500);
+ addressSettings.setMaxSizeBytesRejectThreshold(1000);
+ server.getAddressSettingsRepository().addMatch("#", addressSettings);
+ server.start();
+
+ try (ProtonTestClient peer = new ProtonTestClient()) {
+ peer.queueClientSaslAnonymousConnect();
+ peer.connect("localhost", AMQP_PORT);
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+ peer.expectOpen();
+ peer.expectBegin();
+ peer.expectAttach().ofReceiver();
+ peer.expectFlow().withLinkCredit(3);
+
+ peer.remoteOpen().withContainerId("test-sender").now();
+ peer.remoteBegin().now();
+ peer.remoteAttach().ofSender()
+ .withInitialDeliveryCount(0)
+ .withName("sending-peer")
+ .withTarget().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withSource().also()
+ .now();
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+
+
server.createQueue(QueueConfiguration.of("queue1").setRoutingType(RoutingType.MULTICAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ Wait.assertTrue(() ->
server.queueQuery(SimpleString.of("queue1")).isExists(), 5000, 100);
+
+ Queue queue1 = server.locateQueue(SimpleString.of("queue1"));
+
+ peer.expectDisposition().withState().accepted();
+
+ final String payload = "A".repeat(1024);
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "1").also()
+ .withBody().withString("First Message: " +
payload)
+ .also()
+ .withDeliveryId(1)
+ .now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(1).withDrain(true);
+ peer.expectDisposition().withState().rejected();
+
+ peer.remoteTransfer().withHeader().withDurability(true).also()
+
.withApplicationProperties().withProperty("color", "red").also()
+
.withMessageAnnotations().withAnnotation("x-opt-test", "2").also()
+ .withBody().withString("Second Message: ")
+ .also()
+ .withDeliveryId(2)
+ .later(10);
+
+ // Make capacity again which should now replenish credit once remote
sends drain response.
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofSender();
+ peer.expectDetach();
+ peer.remoteAttach().ofReceiver()
+ .withName("receiving-peer")
+ .withSource().withAddress(getTestName())
+ .withCapabilities("topic").also()
+ .withTarget().also()
+ .now();
+ peer.remoteDetach().later(1);
+
+ queue1.deleteQueue();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+ // Respond drained with one credit outstanding, remote should then
grant a new batch
+
peer.remoteFlow().withLinkCredit(0).withDeliveryCount(3).withDrain(true).now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectClose();
+ peer.remoteClose().now();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
index cb38db93ce..542f2c81e6 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationAddressPolicyTest.java
@@ -6319,6 +6319,12 @@ public class AMQPFederationAddressPolicyTest extends
AmqpClientTestSupport {
peer.expectDisposition().withState().accepted(); // This should fill
the address
+ // In either configuration we drain the link credit to avoid repeated
sends of the failed
+ // deliveries where possible.
+ peer.expectFlow().withDrain(true).withLinkCredit(998)
+ .respond()
+
.withDrain(true).withLinkCredit(0).withDeliveryCount(998).afterDelay(5);
+
// If sending modified the remote won't discard the message, it will
send it again
if (useModifiedForReject) {
peer.expectDisposition().withState().modified(true);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
index fa09db9be8..2c08c2a79c 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPFederationQueuePolicyTest.java
@@ -4990,6 +4990,108 @@ public class AMQPFederationQueuePolicyTest extends
AmqpClientTestSupport {
}
}
+ @Test
+ @Timeout(20)
+ public void
testFederationGrantsCreditOnNewReceiverAfterLinkQuiescedByMessagesConsumingDrainedCredit()
throws Exception {
+ try (ProtonTestServer peer = new ProtonTestServer()) {
+ peer.expectSASLAnonymousConnect();
+ peer.expectOpen().respond();
+ peer.expectBegin().respond();
+ peer.expectAttach().ofSender()
+
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
+ .respondInKind();
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(10);
+ peer.start();
+
+ final URI remoteURI = peer.getServerURI();
+ logger.info("Connect test started, peer listening on: {}", remoteURI);
+
+ final AMQPFederationQueuePolicyElement receiveFromQueue = new
AMQPFederationQueuePolicyElement();
+ receiveFromQueue.setName("queue-policy");
+ receiveFromQueue.addToIncludes(getTestName(), getTestName());
+ receiveFromQueue.addProperty(RECEIVER_QUIESCE_TIMEOUT, 10_000);
+ receiveFromQueue.addProperty(QUEUE_RECEIVER_IDLE_TIMEOUT, 10_000);
+ receiveFromQueue.addProperty("amqpCredits", "3");
+ receiveFromQueue.addProperty("amqpCreditsLow", "0");
+
+ final AMQPFederatedBrokerConnectionElement element = new
AMQPFederatedBrokerConnectionElement();
+ element.setName(getTestName());
+ element.addLocalQueuePolicy(receiveFromQueue);
+
+ final AMQPBrokerConnectConfiguration amqpConnection =
+ new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" +
remoteURI.getHost() + ":" + remoteURI.getPort());
+ amqpConnection.setReconnectAttempts(0);// No reconnects
+ amqpConnection.addElement(element);
+
+ server.getConfiguration().addAMQPConnection(amqpConnection);
+ server.start();
+
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
+
.setAddress(getTestName())
+
.setAutoCreated(false));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectAttach().ofReceiver()
+
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
+ .withName(allOf(containsString(getTestName()),
+ containsString("queue-receiver"),
+
containsString(server.getNodeID().toString())))
+ .respondInKind();
+ peer.expectFlow().withLinkCredit(3);
+
+ final ConnectionFactory factory =
CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ final Session session =
connection.createSession(Session.AUTO_ACKNOWLEDGE);
+ final MessageConsumer consumer =
session.createConsumer(session.createQueue(getTestName()));
+
+ connection.start();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ // Demand is removed so expect a drain, respond to drain to
quiesce the link
+ // which should leave it idling and ready for recovery by next
consumer.
+ peer.expectFlow().withLinkCredit(3).withDrain(true);
+
+ // Burn off credit with messages instead of a flow with drained.
+
peer.remoteTransfer().withBody().withString("test-message-1").also()
+ .withHeader().withDurability(true).also()
+ .withApplicationProperties().also()
+ .withMessageAnnotations().also()
+ .withDeliveryId(0)
+ .queue();
+ peer.expectDisposition().withState().accepted();
+
peer.remoteTransfer().withBody().withString("test-message-2").also()
+ .withHeader().withDurability(true).also()
+ .withApplicationProperties().also()
+ .withMessageAnnotations().also()
+ .withDeliveryId(1)
+ .queue();
+ peer.expectDisposition().withState().accepted();
+
peer.remoteTransfer().withBody().withString("test-message-3").also()
+ .withHeader().withDurability(true).also()
+ .withApplicationProperties().also()
+ .withMessageAnnotations().also()
+ .withDeliveryId(2)
+ .queue();
+ peer.expectDisposition().withState().accepted();
+
+ consumer.close();
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ peer.expectFlow().withLinkCredit(3).withDeliveryCount(3);
+
+ // New demand added before any quiesce or idle timeout can trigger
+ session.createConsumer(session.createQueue(getTestName()));
+
+ peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
+ }
+
+ peer.close();
+ }
+ }
+
private static void sendQueueAddedEvent(ProtonTestPeer peer, String
address, String queue, int handle, int deliveryId) {
final Map<String, Object> eventMap = new LinkedHashMap<>();
eventMap.put(REQUESTED_ADDRESS_NAME, address);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact