This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new e47eb5a  ARTEMIS-589 flow control for individual STOMP subscribers
     new 400623a  This closes #3258
e47eb5a is described below

commit e47eb5ae20f18c7cb4fa52d39c7015131a91e3dd
Author: Justin Bertram <[email protected]>
AuthorDate: Fri Sep 11 10:43:34 2020 -0500

    ARTEMIS-589 flow control for individual STOMP subscribers
---
 .../remoting/impl/netty/TransportConstants.java    |   6 +-
 .../artemis/core/protocol/stomp/Stomp.java         |   7 +
 .../core/protocol/stomp/StompConnection.java       |  15 +-
 .../artemis/core/protocol/stomp/StompFrame.java    |  17 ++-
 .../core/protocol/stomp/StompProtocolManager.java  |  15 +-
 .../artemis/core/protocol/stomp/StompSession.java  |  54 ++++---
 .../core/protocol/stomp/StompSubscription.java     |  18 +--
 .../protocol/stomp/VersionedStompFrameHandler.java |   8 +-
 .../artemis/core/server/ServerConsumer.java        |   2 +-
 .../artemis/core/server/ServerSession.java         |   2 +-
 .../core/server/impl/ServerConsumerImpl.java       |  10 +-
 .../core/server/impl/ServerSessionImpl.java        |   9 +-
 docs/user-manual/en/stomp.md                       |  46 ++++++
 .../tests/integration/cli/DummyServerConsumer.java |   4 +-
 .../tests/integration/stomp/StompTestBase.java     |  41 ++++-
 .../tests/integration/stomp/v12/StompV12Test.java  | 169 +++++++++++++++++++++
 16 files changed, 357 insertions(+), 66 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
index 018b2ba..8bc25a9 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java
@@ -165,9 +165,12 @@ public class TransportConstants {
 
    public static final String CLUSTER_CONNECTION = "clusterConnection";
 
+   @Deprecated
    public static final String STOMP_CONSUMERS_CREDIT = "stompConsumerCredits";
 
-   public static final int STOMP_DEFAULT_CONSUMERS_CREDIT = 10 * 1024; // 10K
+   public static final String STOMP_CONSUMER_WINDOW_SIZE = 
"stompConsumerWindowSize";
+
+   public static final int STOMP_DEFAULT_CONSUMER_WINDOW_SIZE = 10 * 1024; // 
10K
 
    public static final String PROXY_ENABLED_PROP_NAME = "socksEnabled";
 
@@ -396,6 +399,7 @@ public class TransportConstants {
       allowableAcceptorKeys.add(TransportConstants.DIRECT_DELIVER);
       allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
       allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
+      allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMER_WINDOW_SIZE);
       
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED);
       
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
       allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index fa02ee3..c965302 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -157,6 +157,13 @@ public interface Stomp {
           */
          String ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME = 
"activemq.subscriptionName";
 
+         /**
+          * Backwards compatibility for STOMP clients that were using 5.x
+          */
+         String ACTIVEMQ_PREFETCH_SIZE = "activemq.prefetchSize";
+
+         String CONSUMER_WINDOW_SIZE = "consumer-window-size";
+
          String SUBSCRIPTION_TYPE = "subscription-type";
 
          String NO_LOCAL = "no-local";
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 56ab51a..4fff4e8 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -723,12 +723,13 @@ public final class StompConnection implements 
RemotingConnection {
    }
 
    StompPostReceiptFunction subscribe(String destination,
-                  String selector,
-                  String ack,
-                  String id,
-                  String durableSubscriptionName,
-                  boolean noLocal,
-                  RoutingType subscriptionType) throws ActiveMQStompException {
+                                      String selector,
+                                      String ack,
+                                      String id,
+                                      String durableSubscriptionName,
+                                      boolean noLocal,
+                                      RoutingType subscriptionType,
+                                      Integer consumerWindowSize) throws 
ActiveMQStompException {
       autoCreateDestinationIfPossible(destination, subscriptionType);
       checkDestination(destination);
       checkRoutingSemantics(destination, subscriptionType);
@@ -756,7 +757,7 @@ public final class StompConnection implements 
RemotingConnection {
       }
 
       try {
-         return manager.subscribe(this, subscriptionID, 
durableSubscriptionName, destination, selector, ack, noLocal);
+         return manager.subscribe(this, subscriptionID, 
durableSubscriptionName, destination, selector, ack, noLocal, 
consumerWindowSize);
       } catch (ActiveMQStompException e) {
          throw e;
       } catch (Exception e) {
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java
index 1ba5d38..aa921dd 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompFrame.java
@@ -77,12 +77,19 @@ public class StompFrame {
 
    @Override
    public String toString() {
-      return new StringBuilder()
+      StringBuilder result = new StringBuilder()
          .append("StompFrame[command=").append(command)
-         .append(", headers=").append(headers)
-         .append(", content= ").append(this.body)
-         .append(", bytes= ").append(Arrays.toString(bytesBody))
-         .toString();
+         .append(", headers=").append(headers);
+
+      if (command.equals(Stomp.Responses.MESSAGE) || 
command.equals(Stomp.Responses.ERROR) || command.equals(Stomp.Commands.SEND)) {
+         result.append(", body=").append(this.getBody())
+               .append(", body-bytes=").append(Arrays.toString(bytesBody))
+               .append(", size=").append(size);
+      }
+
+      result.append("]");
+
+      return result.toString();
    }
 
    public boolean isPing() {
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 7c7caed..8546f9a 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -341,12 +341,13 @@ public class StompProtocolManager extends 
AbstractProtocolManager<StompFrame, St
    // Inner classes -------------------------------------------------
 
    public StompPostReceiptFunction subscribe(StompConnection connection,
-                         String subscriptionID,
-                         String durableSubscriptionName,
-                         String destination,
-                         String selector,
-                         String ack,
-                         boolean noLocal) throws Exception {
+                                             String subscriptionID,
+                                             String durableSubscriptionName,
+                                             String destination,
+                                             String selector,
+                                             String ack,
+                                             boolean noLocal,
+                                             Integer consumerWindowSize) 
throws Exception {
       StompSession stompSession = getSession(connection);
       stompSession.setNoLocal(noLocal);
       if (stompSession.containsSubscription(subscriptionID)) {
@@ -354,7 +355,7 @@ public class StompProtocolManager extends 
AbstractProtocolManager<StompFrame, St
             ". Either use unique subscription IDs or do not create multiple 
subscriptions for the same destination");
       }
       long consumerID = server.getStorageManager().generateID();
-      return stompSession.addSubscription(consumerID, subscriptionID, 
connection.getClientID(), durableSubscriptionName, destination, selector, ack);
+      return stompSession.addSubscription(consumerID, subscriptionID, 
connection.getClientID(), durableSubscriptionName, destination, selector, ack, 
consumerWindowSize);
    }
 
    public void unsubscribe(StompConnection connection,
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 786a114..7aba86e 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol.stomp;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -71,13 +72,10 @@ public class StompSession implements SessionCallback {
 
    private volatile boolean noLocal = false;
 
-   private final int consumerCredits;
-
    StompSession(final StompConnection connection, final StompProtocolManager 
manager, OperationContext sessionContext) {
       this.connection = connection;
       this.manager = manager;
       this.sessionContext = sessionContext;
-      this.consumerCredits = 
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, 
TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, 
connection.getAcceptorUsed().getConfiguration());
    }
 
    @Override
@@ -216,14 +214,13 @@ public class StompSession implements SessionCallback {
 
    public void acknowledge(String messageID, String subscriptionID) throws 
Exception {
       long id = Long.parseLong(messageID);
-      Pair<Long, Integer> pair = messagesToAck.remove(id);
+      Pair<Long, Integer> pair = messagesToAck.get(id);
 
       if (pair == null) {
          throw 
BUNDLE.failToAckMissingID(id).setHandler(connection.getFrameHandler());
       }
 
       long consumerID = pair.getA();
-      int credits = pair.getB();
 
       StompSubscription sub = subscriptions.get(consumerID);
 
@@ -233,30 +230,45 @@ public class StompSession implements SessionCallback {
          }
       }
 
-      if (this.consumerCredits != -1) {
-         session.receiveConsumerCredits(consumerID, credits);
-      }
-
       if 
(sub.getAck().equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL)) {
          session.individualAcknowledge(consumerID, id);
+
+         if (sub.getConsumerWindowSize() != -1) {
+            session.receiveConsumerCredits(consumerID, 
messagesToAck.remove(id).getB());
+         }
       } else {
-         session.acknowledge(consumerID, id);
+         List<Long> ackedRefs = session.acknowledge(consumerID, id);
+
+         if (sub.getConsumerWindowSize() != -1) {
+            for (Long ackedID : ackedRefs) {
+               session.receiveConsumerCredits(consumerID, 
messagesToAck.remove(ackedID).getB());
+            }
+         }
       }
 
       session.commit();
    }
 
    public StompPostReceiptFunction addSubscription(long consumerID,
-                               String subscriptionID,
-                               String clientID,
-                               String durableSubscriptionName,
-                               String destination,
-                               String selector,
-                               String ack) throws Exception {
+                                                   String subscriptionID,
+                                                   String clientID,
+                                                   String 
durableSubscriptionName,
+                                                   String destination,
+                                                   String selector,
+                                                   String ack,
+                                                   Integer consumerWindowSize) 
throws Exception {
       SimpleString address = SimpleString.toSimpleString(destination);
       SimpleString queueName = SimpleString.toSimpleString(destination);
       SimpleString selectorSimple = SimpleString.toSimpleString(selector);
-      final int receiveCredits = 
ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO) ? -1 : consumerCredits;
+      final int finalConsumerWindowSize;
+
+      if (consumerWindowSize != null) {
+         finalConsumerWindowSize = consumerWindowSize;
+      } else if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
+         finalConsumerWindowSize = -1;
+      } else {
+         finalConsumerWindowSize = 
ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMER_WINDOW_SIZE,
 ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, 
TransportConstants.STOMP_DEFAULT_CONSUMER_WINDOW_SIZE, 
connection.getAcceptorUsed().getConfiguration()), 
connection.getAcceptorUsed().getConfiguration());
+      }
 
       Set<RoutingType> routingTypes = 
manager.getServer().getAddressInfo(getCoreSession().removePrefix(address)).getRoutingTypes();
       boolean multicast = routingTypes.size() == 1 && 
routingTypes.contains(RoutingType.MULTICAST);
@@ -281,10 +293,14 @@ public class StompSession implements SessionCallback {
          }
       }
       final ServerConsumer consumer = session.createConsumer(consumerID, 
queueName, multicast ? null : selectorSimple, false, false, 0);
-      StompSubscription subscription = new StompSubscription(subscriptionID, 
ack, queueName, multicast);
+      StompSubscription subscription = new StompSubscription(subscriptionID, 
ack, queueName, multicast, finalConsumerWindowSize);
       subscriptions.put(consumerID, subscription);
       session.start();
-      return () -> consumer.receiveCredits(receiveCredits);
+      /*
+       * If the consumerWindowSize is 0 then we need to supply at least 1 
credit otherwise messages will *never* flow.
+       * See 
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl#startSlowConsumer()
+       */
+      return () -> consumer.receiveCredits(finalConsumerWindowSize == 0 ? 1 : 
finalConsumerWindowSize);
    }
 
    public boolean unsubscribe(String id, String durableSubscriptionName, 
String clientID) throws Exception {
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
index de6044b..394a15c 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
@@ -19,9 +19,6 @@ package org.apache.activemq.artemis.core.protocol.stomp;
 import org.apache.activemq.artemis.api.core.SimpleString;
 
 public class StompSubscription {
-   // Constants -----------------------------------------------------
-
-   // Attributes ----------------------------------------------------
 
    private final String subID;
 
@@ -32,19 +29,16 @@ public class StompSubscription {
    // whether or not this subscription follows multicast semantics (e.g. for a 
JMS topic)
    private final boolean multicast;
 
-   // Static --------------------------------------------------------
-
-   // Constructors --------------------------------------------------
+   private final int consumerWindowSize;
 
-   public StompSubscription(String subID, String ack, SimpleString queueName, 
boolean multicast) {
+   public StompSubscription(String subID, String ack, SimpleString queueName, 
boolean multicast, int consumerWindowSize) {
       this.subID = subID;
       this.ack = ack;
       this.queueName = queueName;
       this.multicast = multicast;
+      this.consumerWindowSize = consumerWindowSize;
    }
 
-   // Public --------------------------------------------------------
-
    public String getAck() {
       return ack;
    }
@@ -61,9 +55,13 @@ public class StompSubscription {
       return multicast;
    }
 
+   public int getConsumerWindowSize() {
+      return consumerWindowSize;
+   }
+
    @Override
    public String toString() {
-      return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" 
+ queueName + ", multicast=" + multicast + "]";
+      return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" 
+ queueName + ", multicast=" + multicast + ", consumerWindowSize=" + 
consumerWindowSize + "]";
    }
 
 }
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 682d590..2c64eb2 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -274,7 +274,13 @@ public abstract class VersionedStompFrameHandler {
       } else if (frame.hasHeader(Stomp.Headers.Subscribe.ACTIVEMQ_NO_LOCAL)) {
          noLocal = 
Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
       }
-      return connection.subscribe(destination, selector, ack, id, 
durableSubscriptionName, noLocal, routingType);
+      Integer consumerWindowSize = null;
+      if (frame.hasHeader(Headers.Subscribe.CONSUMER_WINDOW_SIZE)) {
+         consumerWindowSize = 
Integer.parseInt(frame.getHeader(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE));
+      } else if (frame.hasHeader(Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE)) {
+         consumerWindowSize = 
Integer.parseInt(frame.getHeader(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE));
+      }
+      return connection.subscribe(destination, selector, ack, id, 
durableSubscriptionName, noLocal, routingType, consumerWindowSize);
    }
 
    public String getDestination(StompFrame request) throws Exception {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
index 0c9c5bf..5236cae 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerConsumer.java
@@ -94,7 +94,7 @@ public interface ServerConsumer extends Consumer, 
ConsumerInfo {
                                                                  Object 
protocolDataStart,
                                                                  Object 
protocolDataEnd);
 
-   void acknowledge(Transaction tx, long messageID) throws Exception;
+   List<Long> acknowledge(Transaction tx, long messageID) throws Exception;
 
    void individualAcknowledge(Transaction tx, long messageID) throws Exception;
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index aff59ec..b319cda 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -62,7 +62,7 @@ public interface ServerSession extends SecurityAuth {
 
    boolean removeConsumer(long consumerID) throws Exception;
 
-   void acknowledge(long consumerID, long messageID) throws Exception;
+   List<Long> acknowledge(long consumerID, long messageID) throws Exception;
 
    void individualAcknowledge(long consumerID, long messageID) throws 
Exception;
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index e20c1e6..7f71970 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -888,11 +888,13 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
    }
 
    @Override
-   public synchronized void acknowledge(Transaction tx, final long messageID) 
throws Exception {
+   public synchronized List<Long> acknowledge(Transaction tx, final long 
messageID) throws Exception {
       if (browseOnly) {
-         return;
+         return null;
       }
 
+      List<Long> ackedRefs = null;
+
       // Acknowledge acknowledges all refs delivered by the consumer up to and 
including the one explicitly
       // acknowledged
 
@@ -909,6 +911,7 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
       try {
 
          MessageReference ref;
+         ackedRefs = new ArrayList<>();
          do {
             synchronized (lock) {
                ref = deliveringRefs.poll();
@@ -925,6 +928,7 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
             }
 
             ref.acknowledge(tx, this);
+            ackedRefs.add(ref.getMessageID());
 
             acks++;
          }
@@ -950,6 +954,8 @@ public class ServerConsumerImpl implements ServerConsumer, 
ReadyListener {
          }
          throw activeMQIllegalStateException;
       }
+
+      return ackedRefs;
    }
 
    @Override
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 454681a..bc29d20 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1191,8 +1191,9 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    }
 
    @Override
-   public void acknowledge(final long consumerID, final long messageID) throws 
Exception {
+   public List<Long> acknowledge(final long consumerID, final long messageID) 
throws Exception {
       ServerConsumer consumer = findConsumer(consumerID);
+      List<Long> ackedRefs = null;
 
       if (tx != null && tx.getState() == State.ROLLEDBACK) {
          // JBPAPP-8845 - if we let stuff to be acked on a rolled back TX, we 
will just
@@ -1200,7 +1201,7 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
          // The tx has already timed out, so we need to ack and rollback 
immediately
          Transaction newTX = newTransaction();
          try {
-            consumer.acknowledge(newTX, messageID);
+            ackedRefs = consumer.acknowledge(newTX, messageID);
          } catch (Exception e) {
             // just ignored
             // will log it just in case
@@ -1209,8 +1210,10 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
          }
          newTX.rollback();
       } else {
-         consumer.acknowledge(autoCommitAcks ? null : tx, messageID);
+         ackedRefs = consumer.acknowledge(autoCommitAcks ? null : tx, 
messageID);
       }
+
+      return ackedRefs;
    }
 
    @Override
diff --git a/docs/user-manual/en/stomp.md b/docs/user-manual/en/stomp.md
index 6076117..fe95c04 100644
--- a/docs/user-manual/en/stomp.md
+++ b/docs/user-manual/en/stomp.md
@@ -367,3 +367,49 @@ parameter on the acceptor.
 
 The `stomp-websockets` example shows how to configure an Apache ActiveMQ
 Artemis broker to have web browsers and Java applications exchanges messages.
+
+## Flow Control
+
+STOMP clients can use the `consumer-window-size` header on the `SUBSCRIBE`
+frame to control the flow of messages to clients. This is broadly discussed in
+the [Flow Control](flow-control.md) chapter.
+
+This ability is similiar to the `activemq.prefetchSize` header supported by
+ActiveMQ 5.x. However, that header specifies the size in terms of *messages*
+whereas `consumer-window-size` specifies the size in terms of *bytes*. ActiveMQ
+Artemis supports the `activemq.prefetchSize` header for backwards compatibility
+but the value will be interpreted as *bytes* just like `consumer-window-size`
+would be. If both `activemq.prefetchSize` and `consumer-window-size` are set
+then the value for `consumer-window-size` will be used.
+
+Setting `consumer-window-size` to `0` will ensure that once a STOMP client
+receives a message that it will *not* receive another one until it sends the
+appropriate `ACK` or `NACK` frame for the message it already has.
+
+Setting `consumer-window-size` to a value *greater than* `0` will allow it to
+receive messages until the cumulative bytes of those messages reaches the
+configured size. Once that happens the client will not receive any more
+messages until it sends the appropriate `ACK` or `NACK` frame for the messages
+it already has.
+
+Setting `consumer-window-size` to `-1` means there is no flow control and the
+broker will dispatch messages to clients as fast as it can.
+
+Flow control can be configured at the `acceptor` as well using the
+`stompConsumerWindowSize` URL parameter. This value is `10240` (i.e. 10K) by
+default for clients using `client` and `client-individual` acknowledgement
+modes. It is `-1` for clients using the `auto` acknowledgement mode. Even
+if `stompConsumerWindowSize` is set on the STOMP `acceptor` it will be
+overriden by the value provided by individual clients using the
+`consumer-window-size` header on their `SUBSCRIBE` frame.
+
+> **Note:**
+>
+> The `stompConsumerWindowSize` URL parameter used to be called
+> `stompConsumerCredits` but was changed to be more consistent with the new
+> header name (i.e. `consumer-window-size`). The `stompConsumerCredits`
+> parameter is deprecated but it will still work for the time being.
+
+Using the [DEBUG logging](#logging) mentioned earlier it is possible to see the
+size of the `MESSAGE` frames dispatched to clients. This can help when trying
+to determine the best `consumer-window-size` setting.
\ No newline at end of file
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 9858357..e594ee8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -145,8 +145,8 @@ public class DummyServerConsumer implements ServerConsumer {
    }
 
    @Override
-   public void acknowledge(Transaction tx, long messageID) throws Exception {
-
+   public List<Long> acknowledge(Transaction tx, long messageID) throws 
Exception {
+      return null;
    }
 
    @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 27bfc00..60f552e 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -178,7 +178,7 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
     * @throws Exception
     */
    protected ActiveMQServer createServer() throws Exception {
-      String stompAcceptorURI = "tcp://" + TransportConstants.DEFAULT_HOST + 
":" + TransportConstants.DEFAULT_STOMP_PORT + "?" + 
TransportConstants.STOMP_CONSUMERS_CREDIT + "=-1";
+      String stompAcceptorURI = "tcp://" + TransportConstants.DEFAULT_HOST + 
":" + TransportConstants.DEFAULT_STOMP_PORT + "?" + 
TransportConstants.STOMP_CONSUMER_WINDOW_SIZE + "=-1";
       if (isEnableStompMessageId()) {
          stompAcceptorURI += ";" + TransportConstants.STOMP_ENABLE_MESSAGE_ID 
+ "=true";
       }
@@ -391,12 +391,36 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
    }
 
    public static ClientStompFrame subscribe(StompClientConnection conn,
-                                     String subscriptionId,
-                                     String ack,
-                                     String durableId,
-                                     String selector,
-                                     String destination,
-                                     boolean receipt) throws IOException, 
InterruptedException {
+                                            String subscriptionId,
+                                            String ack,
+                                            String durableId,
+                                            String selector,
+                                            String destination,
+                                            boolean receipt) throws 
IOException, InterruptedException {
+      return subscribe(conn, subscriptionId, ack, durableId, selector, 
destination, receipt, null);
+   }
+
+   public static ClientStompFrame subscribe(StompClientConnection conn,
+                                            String subscriptionId,
+                                            String ack,
+                                            String durableId,
+                                            String selector,
+                                            String destination,
+                                            boolean receipt,
+                                            Integer consumerWindowSize) throws 
IOException, InterruptedException {
+      return subscribe(conn, subscriptionId, ack, durableId, selector, 
destination, receipt, consumerWindowSize, 
Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE);
+   }
+
+   public static ClientStompFrame subscribe(StompClientConnection conn,
+                                            String subscriptionId,
+                                            String ack,
+                                            String durableId,
+                                            String selector,
+                                            String destination,
+                                            boolean receipt,
+                                            Integer consumerWindowSize,
+                                            String consumerWindowSizeHeader) 
throws IOException, InterruptedException {
+
       ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
                                    
.addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, 
RoutingType.ANYCAST.toString())
                                    
.addHeader(Stomp.Headers.Subscribe.DESTINATION, destination);
@@ -412,6 +436,9 @@ public abstract class StompTestBase extends 
ActiveMQTestBase {
       if (selector != null) {
          frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
       }
+      if (consumerWindowSize != null) {
+         frame.addHeader(consumerWindowSizeHeader, 
consumerWindowSize.toString());
+      }
 
       String uuid = UUID.randomUUID().toString();
       if (receipt) {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index da1fc53..eb995e1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -38,6 +38,7 @@ import java.util.regex.Pattern;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.protocol.stomp.StompConnection;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
 import 
org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
@@ -2368,6 +2369,174 @@ public class StompV12Test extends StompTestBase {
       conn.disconnect();
    }
 
+   @Test
+   public void testSubscribeWithZeroConsumerWindowSize() throws Exception {
+      
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE,
 true);
+   }
+
+   @Test
+   public void testSubscribeWithZeroConsumerWindowSizeLegacyHeader() throws 
Exception {
+      
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE,
 true);
+   }
+
+   @Test
+   public void testSubscribeWithZeroConsumerWindowSizeAndNack() throws 
Exception {
+      
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE,
 false);
+   }
+
+   @Test
+   public void testSubscribeWithZeroConsumerWindowSizeLegacyHeaderAndNack() 
throws Exception {
+      
internalSubscribeWithZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE,
 false);
+   }
+
+   private void internalSubscribeWithZeroConsumerWindowSize(String 
consumerWindowSizeHeader, boolean ack) throws Exception {
+      final int TIMEOUT = 1000;
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, 
Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL, null, null, 
getQueuePrefix() + getQueueName(), true, 0, consumerWindowSizeHeader);
+
+      sendJmsMessage(getName());
+      sendJmsMessage(getName());
+      ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame1);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
+      String messageID = frame1.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+      ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNull(frame2);
+      if (ack) {
+         ack(conn, messageID);
+      } else {
+         nack(conn, messageID);
+      }
+
+      ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame3);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame3.getCommand());
+      messageID = frame3.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+      if (ack) {
+         ack(conn, messageID);
+      } else {
+         nack(conn, messageID);
+      }
+
+      conn.disconnect();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(TIMEOUT);
+      Assert.assertNull(message);
+   }
+
+   @Test
+   public void testSubscribeWithNonZeroConsumerWindowSize() throws Exception {
+      
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE,
 true);
+   }
+
+   @Test
+   public void testSubscribeWithNonZeroConsumerWindowSizeLegacyHeader() throws 
Exception {
+      
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE,
 true);
+   }
+
+   @Test
+   public void testSubscribeWithNonZeroConsumerWindowSizeAndNack() throws 
Exception {
+      
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE,
 false);
+   }
+
+   @Test
+   public void testSubscribeWithNonZeroConsumerWindowSizeLegacyHeaderAndNack() 
throws Exception {
+      
internalSubscribeWithNonZeroConsumerWindowSize(Stomp.Headers.Subscribe.ACTIVEMQ_PREFETCH_SIZE,
 false);
+   }
+
+   private void internalSubscribeWithNonZeroConsumerWindowSize(String 
consumerWindowSizeHeader, boolean ack) throws Exception {
+      // the size of each message was determined from the DEBUG logging from 
org.apache.activemq.artemis.core.protocol.stomp.StompConnection
+      final int MESSAGE_SIZE = 270;
+      final int TIMEOUT = 1000;
+      final String MESSAGE = "foo-foo-foo";
+
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, 
Stomp.Headers.Subscribe.AckModeValues.CLIENT_INDIVIDUAL, null, null, 
getQueuePrefix() + getQueueName(), true, MESSAGE_SIZE * 2, 
consumerWindowSizeHeader);
+
+      sendJmsMessage(MESSAGE);
+      sendJmsMessage(MESSAGE);
+      sendJmsMessage(MESSAGE);
+      ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame1);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
+      String messageID1 = frame1.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+      ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame2);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
+      String messageID2 = frame2.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+      ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNull(frame3);
+      if (ack) {
+         ack(conn, messageID1);
+         ack(conn, messageID2);
+      } else {
+         nack(conn, messageID1);
+         nack(conn, messageID2);
+      }
+
+      ClientStompFrame frame4 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame4);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame4.getCommand());
+      String messageID4 = frame4.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+      if (ack) {
+         ack(conn, messageID4);
+      } else {
+         nack(conn, messageID4);
+      }
+
+      conn.disconnect();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(TIMEOUT);
+      Assert.assertNull(message);
+   }
+
+   @Test
+   public void testSubscribeWithNonZeroConsumerWindowSizeAndClientAck() throws 
Exception {
+      
org.jboss.logmanager.Logger.getLogger(StompConnection.class.getName()).setLevel(org.jboss.logmanager.Level.DEBUG);
+      // the size of each message was determined from the DEBUG logging from 
org.apache.activemq.artemis.core.protocol.stomp.StompConnection
+      final int MESSAGE_SIZE = 270;
+      final int TIMEOUT = 1000;
+      final String MESSAGE = "foo-foo-foo";
+
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT, 
null, null, getQueuePrefix() + getQueueName(), true, MESSAGE_SIZE * 2, 
Stomp.Headers.Subscribe.CONSUMER_WINDOW_SIZE);
+
+      sendJmsMessage(MESSAGE);
+      sendJmsMessage(MESSAGE);
+      sendJmsMessage(MESSAGE);
+      sendJmsMessage(MESSAGE);
+
+      ClientStompFrame frame1 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame1);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame1.getCommand());
+      ClientStompFrame frame2 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame2);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame2.getCommand());
+      String messageID2 = frame2.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+      ClientStompFrame frame3 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNull(frame3);
+      // this should clear the first 2 messages since we're using CLIENT ack 
mode
+      ack(conn, messageID2);
+
+      ClientStompFrame frame4 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame4);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame4.getCommand());
+      ClientStompFrame frame5 = conn.receiveFrame(TIMEOUT);
+      Assert.assertNotNull(frame5);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame5.getCommand());
+      String messageID5 = frame5.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+      // this should clear the next 2 messages
+      ack(conn, messageID5);
+
+      conn.disconnect();
+
+      MessageConsumer consumer = session.createConsumer(queue);
+      Message message = consumer.receive(TIMEOUT);
+      Assert.assertNull(message);
+   }
+
    private void ack(StompClientConnection conn, ClientStompFrame frame) throws 
IOException, InterruptedException {
       String messageID = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
 

Reply via email to