Github user michaelandrepearce commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2467#discussion_r242787783
--- Diff:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
---
@@ -122,7 +136,53 @@ public Object getBrokerConsumer() {
@Override
public void onFlow(int currentCredits, boolean drain) {
- sessionSPI.onFlowConsumer(brokerConsumer, currentCredits, drain);
+ connection.requireInHandler();
+
+ setupCredit();
+
+ ServerConsumerImpl serverConsumer = (ServerConsumerImpl)
brokerConsumer;
+ if (drain) {
+ // If the draining is already running, then don't do anything
+ if (draining.compareAndSet(false, true)) {
+ final ProtonServerSenderContext plugSender =
(ProtonServerSenderContext) serverConsumer.getProtocolContext();
+ serverConsumer.forceDelivery(1, new Runnable() {
+ @Override
+ public void run() {
+ try {
+ connection.runNow(() -> {
+ plugSender.reportDrained();
+ setupCredit();
+ });
+ } finally {
+ draining.set(false);
+ }
+ }
+ });
+ }
+ } else {
+ serverConsumer.receiveCredits(-1);
+ }
+ }
+
+ public boolean hasCredits() {
+ if (!connection.flowControl(onflowControlReady)) {
+ return false;
+ }
+
+ //return true;
+ //return getSender().getCredit() > 0;
--- End diff --
Remove commented out code
---