This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new e47eb5a ARTEMIS-589 flow control for individual STOMP subscribers
new 400623a This closes #3258
e47eb5a is described below
commit e47eb5ae20f18c7cb4fa52d39c7015131a91e3dd
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Sep 11 10:43:34 2020 -0500
ARTEMIS-589 flow control for individual STOMP subscribers
---
.../remoting/impl/netty/TransportConstants.java | 6 +-
.../artemis/core/protocol/stomp/Stomp.java | 7 +
.../core/protocol/stomp/StompConnection.java | 15 +-
.../artemis/core/protocol/stomp/StompFrame.java | 17 ++-
.../core/protocol/stomp/StompProtocolManager.java | 15 +-
.../artemis/core/protocol/stomp/StompSession.java | 54 ++++---
.../core/protocol/stomp/StompSubscription.java | 18 +--
.../protocol/stomp/VersionedStompFrameHandler.java | 8 +-
.../artemis/core/server/ServerConsumer.java | 2 +-
.../artemis/core/server/ServerSession.java | 2 +-
.../core/server/impl/ServerConsumerImpl.java | 10 +-
.../core/server/impl/ServerSessionImpl.java | 9 +-
docs/user-manual/en/stomp.md | 46 ++++++
.../tests/integration/cli/DummyServerConsumer.java | 4 +-
.../tests/integration/stomp/StompTestBase.java | 41 ++++-
.../tests/integration/stomp/v12/StompV12Test.java | 169 +++++++++++++++++++++
16 files changed, 357 insertions(+), 66 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 018b2ba..8bc25a9 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -165,9 +165,12 @@ public class TransportConstants {
public static final String CLUSTER_CONNECTION = "clusterConnection";
+ @Deprecated
public static final String STOMP_CONSUMERS_CREDIT = "stompConsumerCredits";
- public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K
+ public static final String STOMP_CONSUMER_WINDOW_SIZE =
"stompConsumerWindowSize";
+
+ public static final int STOMP_DEFAULT_CONSUMER_WINDOW_SIZE = 10 * 1024; //
10K
public static final String PROXY_ENABLED_PROP_NAME = "socksEnabled";
@@ -396,6 +399,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
+ allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMER_WINDOW_SIZE);
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED);
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index fa02ee3..c965302 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -157,6 +157,13 @@ public interface Stomp {
*/
String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME =
"activemq.subscriptionName";
+ /**
+ * Backwards compatibility for STOMP clients that were using 5.x
+ */
+ String ACTIVEMQ_PREFETCH_SIZE = "activemq.prefetchSize";
+
+ String CONSUMER_WINDOW_SIZE = "consumer-window-size";
+
String SUBSCRIPTION_TYPE = "subscription-type";
String NO_LOCAL = "no-local";
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 56ab51a..4fff4e8 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -723,12 +723,13 @@ public final class StompConnection implements
RemotingConnection {
}
StompPostReceiptFunction subscribe(String destination,
- String selector,
- String ack,
- String id,
- String durableSubscriptionName,
- boolean noLocal,
- RoutingType subscriptionType) throws ActiveMQStompException {
+ String selector,
+ String ack,
+ String id,
+ String durableSubscriptionName,
+ boolean noLocal,
+ RoutingType subscriptionType,
+ Integer consumerWindowSize) throws
ActiveMQStompException {
autoCreateDestinationIfPossible(destination, subscriptionType);
checkDestination(destination);
checkRoutingSemantics(destination, subscriptionType);
@@ -756,7 +757,7 @@ public final class StompConnection implements
RemotingConnection {
}
try {
- return manager.subscribe(this, subscriptionID,
durableSubscriptionName, destination, selector, ack, noLocal);
+ return manager.subscribe(this, subscriptionID,
durableSubscriptionName, destination, selector, ack, noLocal,
consumerWindowSize);
} catch (ActiveMQStompException e) {
throw e;
} catch (Exception e) {
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java
index 1ba5d38..aa921dd 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java
@@ -77,12 +77,19 @@ public class StompFrame {
@Override
public String toString() {
- return new StringBuilder()
+ StringBuilder result = new StringBuilder()
.append("StompFrame[command=").append(command)
- .append(", headers=").append(headers)
- .append(", content= ").append(this.body)
- .append(", bytes= ").append(Arrays.toString(bytesBody))
- .toString();
+ .append(", headers=").append(headers);
+
+ if (command.equals(Stomp.Responses.MESSAGE) ||
command.equals(Stomp.Responses.ERROR) || command.equals(Stomp.Commands.SEND)) {
+ result.append(", body=").append(this.getBody())
+ .append(", body-bytes=").append(Arrays.toString(bytesBody))
+ .append(", size=").append(size);
+ }
+
+ result.append("]");
+
+ return result.toString();
}
public boolean isPing() {
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 7c7caed..8546f9a 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -341,12 +341,13 @@ public class StompProtocolManager extends
AbstractProtocolManager<StompFrame, St
// Inner classes -------------------------------------------------
public StompPostReceiptFunction subscribe(StompConnection connection,
- String subscriptionID,
- String durableSubscriptionName,
- String destination,
- String selector,
- String ack,
- boolean noLocal) throws Exception {
+ String subscriptionID,
+ String durableSubscriptionName,
+ String destination,
+ String selector,
+ String ack,
+ boolean noLocal,
+ Integer consumerWindowSize)
throws Exception {
StompSession stompSession = getSession(connection);
stompSession.setNoLocal(noLocal);
if (stompSession.containsSubscription(subscriptionID)) {
@@ -354,7 +355,7 @@ public class StompProtocolManager extends
AbstractProtocolManager<StompFrame, St
". Either use unique subscription IDs or do not create multiple
subscriptions for the same destination");
}
long consumerID = server.getStorageManager().generateID();
- return stompSession.addSubscription(consumerID, subscriptionID,
connection.getClientID(), durableSubscriptionName, destination, selector, ack);
+ return stompSession.addSubscription(consumerID, subscriptionID,
connection.getClientID(), durableSubscriptionName, destination, selector, ack,
consumerWindowSize);
}
public void unsubscribe(StompConnection connection,
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 786a114..7aba86e 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.stomp;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -71,13 +72,10 @@ public class StompSession implements SessionCallback {
private volatile boolean noLocal = false;
- private final int consumerCredits;
-
StompSession(final StompConnection connection, final StompProtocolManager
manager, OperationContext sessionContext) {
this.connection = connection;
this.manager = manager;
this.sessionContext = sessionContext;
- this.consumerCredits =
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT,
TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT,
connection.getAcceptorUsed().getConfiguration());
}
@Override
@@ -216,14 +214,13 @@ public class StompSession implements SessionCallback {
public void acknowledge(String messageID, String subscriptionID) throws
Exception {
long id = Long.parseLong(messageID);
- Pair<Long, Integer> pair = messagesToAck.remove(id);
+ Pair<Long, Integer> pair = messagesToAck.get(id);
if (pair == null) {
throw
BUNDLE.failToAckMissingID(id).setHandler(connection.getFrameHandler());
}
long consumerID = pair.getA();
- int credits = pair.getB();
StompSubscription sub = subscriptions.get(consumerID);
@@ -233,30 +230,45 @@ public class StompSession implements SessionCallback {
}
}
- if (this.consumerCredits != -1) {
- session.receiveConsumerCredits(consumerID, credits);
- }
-
if
(sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)) {
session.individualAcknowledge(consumerID, id);
+
+ if (sub.getConsumerWindowSize() != -1) {
+ session.receiveConsumerCredits(consumerID,
messagesToAck.remove(id).getB());
+ }
} else {
- session.acknowledge(consumerID, id);
+ List<Long> ackedRefs = session.acknowledge(consumerID, id);
+
+ if (sub.getConsumerWindowSize() != -1) {
+ for (Long ackedID : ackedRefs) {
+ session.receiveConsumerCredits(consumerID,
messagesToAck.remove(ackedID).getB());
+ }
+ }
}
session.commit();
}
public StompPostReceiptFunction addSubscription(long consumerID,
- String subscriptionID,
- String clientID,
- String durableSubscriptionName,
- String destination,
- String selector,
- String ack) throws Exception {
+ String subscriptionID,
+ String clientID,
+ String
durableSubscriptionName,
+ String destination,
+ String selector,
+ String ack,
+ Integer consumerWindowSize)
throws Exception {
SimpleString address = SimpleString.toSimpleString(destination);
SimpleString queueName = SimpleString.toSimpleString(destination);
SimpleString selectorSimple = SimpleString.toSimpleString(selector);
- final int receiveCredits =
ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits;
+ final int finalConsumerWindowSize;
+
+ if (consumerWindowSize != null) {
+ finalConsumerWindowSize = consumerWindowSize;
+ } else if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
+ finalConsumerWindowSize = -1;
+ } else {
+ finalConsumerWindowSize =
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMER_WINDOW_SIZE,
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT,
TransportConstants.STOMP_DEFAULT_CONSUMER_WINDOW_SIZE,
connection.getAcceptorUsed().getConfiguration()),
connection.getAcceptorUsed().getConfiguration());
+ }
Set<RoutingType> routingTypes =
manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
boolean multicast = routingTypes.size() == 1 &&
routingTypes.contains(RoutingType.MULTICAST);
@@ -281,10 +293,14 @@ public class StompSession implements SessionCallback {
}
}
final ServerConsumer consumer = session.createConsumer(consumerID,
queueName, multicast ? null : selectorSimple, false, false, 0);
- StompSubscription subscription = new StompSubscription(subscriptionID,
ack, queueName, multicast);
+ StompSubscription subscription = new StompSubscription(subscriptionID,
ack, queueName, multicast, finalConsumerWindowSize);
subscriptions.put(consumerID, subscription);
session.start();
- return () -> consumer.receiveCredits(receiveCredits);
+ /*
+ * If the consumerWindowSize is 0 then we need to supply at least 1
credit otherwise messages will *never* flow.
+ * See
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl#startSlowConsumer()
+ */
+ return () -> consumer.receiveCredits(finalConsumerWindowSize == 0 ? 1 :
finalConsumerWindowSize);
}
public boolean unsubscribe(String id, String durableSubscriptionName,
String clientID) throws Exception {
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
index de6044b..394a15c 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
@@ -19,9 +19,6 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import org.apache.activemq.artemis.api.core.SimpleString;
public class StompSubscription {
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
private final String subID;
@@ -32,19 +29,16 @@ public class StompSubscription {
// whether or not this subscription follows multicast semantics (e.g. for a
JMS topic)
private final boolean multicast;
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
+ private final int consumerWindowSize;
- public StompSubscription(String subID, String ack, SimpleString queueName,
boolean multicast) {
+ public StompSubscription(String subID, String ack, SimpleString queueName,
boolean multicast, int consumerWindowSize) {
this.subID = subID;
this.ack = ack;
this.queueName = queueName;
this.multicast = multicast;
+ this.consumerWindowSize = consumerWindowSize;
}
- // Public --------------------------------------------------------
-
public String getAck() {
return ack;
}
@@ -61,9 +55,13 @@ public class StompSubscription {
return multicast;
}
+ public int getConsumerWindowSize() {
+ return consumerWindowSize;
+ }
+
@Override
public String toString() {
- return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName="
+ queueName + ", multicast=" + multicast + "]";
+ return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName="
+ queueName + ", multicast=" + multicast + ", consumerWindowSize=" +
consumerWindowSize + "]";
}
}
diff --git
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 682d590..2c64eb2 100644
---
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -274,7 +274,13 @@ public abstract class VersionedStompFrameHandler {
} else if (frame.hasHeader(Stomp.Headers.Subscribe.ACTIVEMQ_NO_LOCAL)) {
noLocal =
Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
}
- return connection.subscribe(destination, selector, ack, id,
durableSubscriptionName, noLocal, routingType);
+ Integer consumerWindowSize = null;
+ if (frame.hasHeader(Headers.Subscribe.CONSUMER_WINDOW_SIZE)) {
+ consumerWindowSize =
Integer.parseInt(frame.getHeader(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE));
+ } else if (frame.hasHeader(Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE)) {
+ consumerWindowSize =
Integer.parseInt(frame.getHeader(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE));
+ }
+ return connection.subscribe(destination, selector, ack, id,
durableSubscriptionName, noLocal, routingType, consumerWindowSize);
}
public String getDestination(StompFrame request) throws Exception {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 0c9c5bf..5236cae 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -94,7 +94,7 @@ public interface ServerConsumer extends Consumer,
ConsumerInfo {
Object
protocolDataStart,
Object
protocolDataEnd);
- void acknowledge(Transaction tx, long messageID) throws Exception;
+ List<Long> acknowledge(Transaction tx, long messageID) throws Exception;
void individualAcknowledge(Transaction tx, long messageID) throws Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index aff59ec..b319cda 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -62,7 +62,7 @@ public interface ServerSession extends SecurityAuth {
boolean removeConsumer(long consumerID) throws Exception;
- void acknowledge(long consumerID, long messageID) throws Exception;
+ List<Long> acknowledge(long consumerID, long messageID) throws Exception;
void individualAcknowledge(long consumerID, long messageID) throws
Exception;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index e20c1e6..7f71970 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -888,11 +888,13 @@ public class ServerConsumerImpl implements
ServerConsumer, ReadyListener {
}
@Override
- public synchronized void acknowledge(Transaction tx, final long messageID)
throws Exception {
+ public synchronized List<Long> acknowledge(Transaction tx, final long
messageID) throws Exception {
if (browseOnly) {
- return;
+ return null;
}
+ List<Long> ackedRefs = null;
+
// Acknowledge acknowledges all refs delivered by the consumer up to and
including the one explicitly
// acknowledged
@@ -909,6 +911,7 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
try {
MessageReference ref;
+ ackedRefs = new ArrayList<>();
do {
synchronized (lock) {
ref = deliveringRefs.poll();
@@ -925,6 +928,7 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
}
ref.acknowledge(tx, this);
+ ackedRefs.add(ref.getMessageID());
acks++;
}
@@ -950,6 +954,8 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
}
throw activeMQIllegalStateException;
}
+
+ return ackedRefs;
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 454681a..bc29d20 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1191,8 +1191,9 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
}
@Override
- public void acknowledge(final long consumerID, final long messageID) throws
Exception {
+ public List<Long> acknowledge(final long consumerID, final long messageID)
throws Exception {
ServerConsumer consumer = findConsumer(consumerID);
+ List<Long> ackedRefs = null;
if (tx != null && tx.getState() == State.ROLLEDBACK) {
// JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we
will just
@@ -1200,7 +1201,7 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
// The tx has already timed out, so we need to ack and rollback
immediately
Transaction newTX = newTransaction();
try {
- consumer.acknowledge(newTX, messageID);
+ ackedRefs = consumer.acknowledge(newTX, messageID);
} catch (Exception e) {
// just ignored
// will log it just in case
@@ -1209,8 +1210,10 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
}
newTX.rollback();
} else {
- consumer.acknowledge(autoCommitAcks ? null : tx, messageID);
+ ackedRefs = consumer.acknowledge(autoCommitAcks ? null : tx,
messageID);
}
+
+ return ackedRefs;
}
@Override
diff --git a/docs/user-manual/en/stomp.md b/docs/user-manual/en/stomp.md
index 6076117..fe95c04 100644
--- a/docs/user-manual/en/stomp.md
+++ b/docs/user-manual/en/stomp.md
@@ -367,3 +367,49 @@ parameter on the acceptor.
The `stomp-websockets` example shows how to configure an Apache ActiveMQ
Artemis broker to have web browsers and Java applications exchanges messages.
+
+## Flow Control
+
+STOMP clients can use the `consumer-window-size` header on the `SUBSCRIBE`
+frame to control the flow of messages to clients. This is broadly discussed in
+the [Flow Control](flow-control.md) chapter.
+
+This ability is similiar to the `activemq.prefetchSize` header supported by
+ActiveMQ 5.x. However, that header specifies the size in terms of *messages*
+whereas `consumer-window-size` specifies the size in terms of *bytes*. ActiveMQ
+Artemis supports the `activemq.prefetchSize` header for backwards compatibility
+but the value will be interpreted as *bytes* just like `consumer-window-size`
+would be. If both `activemq.prefetchSize` and `consumer-window-size` are set
+then the value for `consumer-window-size` will be used.
+
+Setting `consumer-window-size` to `0` will ensure that once a STOMP client
+receives a message that it will *not* receive another one until it sends the
+appropriate `ACK` or `NACK` frame for the message it already has.
+
+Setting `consumer-window-size` to a value *greater than* `0` will allow it to
+receive messages until the cumulative bytes of those messages reaches the
+configured size. Once that happens the client will not receive any more
+messages until it sends the appropriate `ACK` or `NACK` frame for the messages
+it already has.
+
+Setting `consumer-window-size` to `-1` means there is no flow control and the
+broker will dispatch messages to clients as fast as it can.
+
+Flow control can be configured at the `acceptor` as well using the
+`stompConsumerWindowSize` URL parameter. This value is `10240` (i.e. 10K) by
+default for clients using `client` and `client-individual` acknowledgement
+modes. It is `-1` for clients using the `auto` acknowledgement mode. Even
+if `stompConsumerWindowSize` is set on the STOMP `acceptor` it will be
+overriden by the value provided by individual clients using the
+`consumer-window-size` header on their `SUBSCRIBE` frame.
+
+> **Note:**
+>
+> The `stompConsumerWindowSize` URL parameter used to be called
+> `stompConsumerCredits` but was changed to be more consistent with the new
+> header name (i.e. `consumer-window-size`). The `stompConsumerCredits`
+> parameter is deprecated but it will still work for the time being.
+
+Using the [DEBUG logging](#logging) mentioned earlier it is possible to see the
+size of the `MESSAGE` frames dispatched to clients. This can help when trying
+to determine the best `consumer-window-size` setting.
\ No newline at end of file
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 9858357..e594ee8 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -145,8 +145,8 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
- public void acknowledge(Transaction tx, long messageID) throws Exception {
-
+ public List<Long> acknowledge(Transaction tx, long messageID) throws
Exception {
+ return null;
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 27bfc00..60f552e 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -178,7 +178,7 @@ public abstract class StompTestBase extends
ActiveMQTestBase {
* @throws Exception
*/
protected ActiveMQServer createServer() throws Exception {
- String stompAcceptorURI = "tcp://" + TransportConstants.DEFAULT_HOST +
":" + TransportConstants.DEFAULT_STOMP_PORT + "?" +
TransportConstants.STOMP_CONSUMERS_CREDIT + "=-1";
+ String stompAcceptorURI = "tcp://" + TransportConstants.DEFAULT_HOST +
":" + TransportConstants.DEFAULT_STOMP_PORT + "?" +
TransportConstants.STOMP_CONSUMER_WINDOW_SIZE + "=-1";
if (isEnableStompMessageId()) {
stompAcceptorURI += ";" + TransportConstants.STOMP_ENABLE_MESSAGE_ID
+ "=true";
}
@@ -391,12 +391,36 @@ public abstract class StompTestBase extends
ActiveMQTestBase {
}
public static ClientStompFrame subscribe(StompClientConnection conn,
- String subscriptionId,
- String ack,
- String durableId,
- String selector,
- String destination,
- boolean receipt) throws IOException,
InterruptedException {
+ String subscriptionId,
+ String ack,
+ String durableId,
+ String selector,
+ String destination,
+ boolean receipt) throws
IOException, InterruptedException {
+ return subscribe(conn, subscriptionId, ack, durableId, selector,
destination, receipt, null);
+ }
+
+ public static ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ String selector,
+ String destination,
+ boolean receipt,
+ Integer consumerWindowSize) throws
IOException, InterruptedException {
+ return subscribe(conn, subscriptionId, ack, durableId, selector,
destination, receipt, consumerWindowSize,
Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE);
+ }
+
+ public static ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ String selector,
+ String destination,
+ boolean receipt,
+ Integer consumerWindowSize,
+ String consumerWindowSizeHeader)
throws IOException, InterruptedException {
+
ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE,
RoutingType.ANYCAST.toString())
.addHeader(Stomp.Headers.Subscribe.DESTINATION, destination);
@@ -412,6 +436,9 @@ public abstract class StompTestBase extends
ActiveMQTestBase {
if (selector != null) {
frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
}
+ if (consumerWindowSize != null) {
+ frame.addHeader(consumerWindowSizeHeader,
consumerWindowSize.toString());
+ }
String uuid = UUID.randomUUID().toString();
if (receipt) {
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index da1fc53..eb995e1 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -2368,6 +2369,174 @@ public class StompV12Test extends StompTestBase {
conn.disconnect();
}
+ @Test
+ public void testSubscribeWithZeroConsumerWindowSize() throws Exception {
+
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE,
true);
+ }
+
+ @Test
+ public void testSubscribeWithZeroConsumerWindowSizeLegacyHeader() throws
Exception {
+
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE,
true);
+ }
+
+ @Test
+ public void testSubscribeWithZeroConsumerWindowSizeAndNack() throws
Exception {
+
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE,
false);
+ }
+
+ @Test
+ public void testSubscribeWithZeroConsumerWindowSizeLegacyHeaderAndNack()
throws Exception {
+
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE,
false);
+ }
+
+ private void internalSubscribeWithZeroConsumerWindowSize(String
consumerWindowSizeHeader, boolean ack) throws Exception {
+ final int TIMEOUT = 1000;
+ conn.connect(defUser, defPass);
+ subscribe(conn, null,
Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL, null, null,
getQueuePrefix() + getQueueName(), true, 0, consumerWindowSizeHeader);
+
+ sendJmsMessage(getName());
+ sendJmsMessage(getName());
+ ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame1);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
+ String messageID = frame1.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+ ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNull(frame2);
+ if (ack) {
+ ack(conn, messageID);
+ } else {
+ nack(conn, messageID);
+ }
+
+ ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame3);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame3.getCommand());
+ messageID = frame3.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+ if (ack) {
+ ack(conn, messageID);
+ } else {
+ nack(conn, messageID);
+ }
+
+ conn.disconnect();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(TIMEOUT);
+ Assert.assertNull(message);
+ }
+
+ @Test
+ public void testSubscribeWithNonZeroConsumerWindowSize() throws Exception {
+
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE,
true);
+ }
+
+ @Test
+ public void testSubscribeWithNonZeroConsumerWindowSizeLegacyHeader() throws
Exception {
+
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE,
true);
+ }
+
+ @Test
+ public void testSubscribeWithNonZeroConsumerWindowSizeAndNack() throws
Exception {
+
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE,
false);
+ }
+
+ @Test
+ public void testSubscribeWithNonZeroConsumerWindowSizeLegacyHeaderAndNack()
throws Exception {
+
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE,
false);
+ }
+
+ private void internalSubscribeWithNonZeroConsumerWindowSize(String
consumerWindowSizeHeader, boolean ack) throws Exception {
+ // the size of each message was determined from the DEBUG logging from
org.apache.activemq.artemis.core.protocol.stomp.StompConnection
+ final int MESSAGE_SIZE = 270;
+ final int TIMEOUT = 1000;
+ final String MESSAGE = "foo-foo-foo";
+
+ conn.connect(defUser, defPass);
+ subscribe(conn, null,
Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL, null, null,
getQueuePrefix() + getQueueName(), true, MESSAGE_SIZE * 2,
consumerWindowSizeHeader);
+
+ sendJmsMessage(MESSAGE);
+ sendJmsMessage(MESSAGE);
+ sendJmsMessage(MESSAGE);
+ ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame1);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
+ String messageID1 = frame1.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+ ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame2);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
+ String messageID2 = frame2.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+ ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNull(frame3);
+ if (ack) {
+ ack(conn, messageID1);
+ ack(conn, messageID2);
+ } else {
+ nack(conn, messageID1);
+ nack(conn, messageID2);
+ }
+
+ ClientStompFrame frame4 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame4);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame4.getCommand());
+ String messageID4 = frame4.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+ if (ack) {
+ ack(conn, messageID4);
+ } else {
+ nack(conn, messageID4);
+ }
+
+ conn.disconnect();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(TIMEOUT);
+ Assert.assertNull(message);
+ }
+
+ @Test
+ public void testSubscribeWithNonZeroConsumerWindowSizeAndClientAck() throws
Exception {
+
org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.DEBUG);
+ // the size of each message was determined from the DEBUG logging from
org.apache.activemq.artemis.core.protocol.stomp.StompConnection
+ final int MESSAGE_SIZE = 270;
+ final int TIMEOUT = 1000;
+ final String MESSAGE = "foo-foo-foo";
+
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT,
null, null, getQueuePrefix() + getQueueName(), true, MESSAGE_SIZE * 2,
Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE);
+
+ sendJmsMessage(MESSAGE);
+ sendJmsMessage(MESSAGE);
+ sendJmsMessage(MESSAGE);
+ sendJmsMessage(MESSAGE);
+
+ ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame1);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
+ ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame2);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
+ String messageID2 = frame2.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+ ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNull(frame3);
+ // this should clear the first 2 messages since we're using CLIENT ack
mode
+ ack(conn, messageID2);
+
+ ClientStompFrame frame4 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame4);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame4.getCommand());
+ ClientStompFrame frame5 = conn.receiveFrame(TIMEOUT);
+ Assert.assertNotNull(frame5);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame5.getCommand());
+ String messageID5 = frame5.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+ // this should clear the next 2 messages
+ ack(conn, messageID5);
+
+ conn.disconnect();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message message = consumer.receive(TIMEOUT);
+ Assert.assertNull(message);
+ }
+
private void ack(StompClientConnection conn, ClientStompFrame frame) throws
IOException, InterruptedException {
String messageID = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);