Github user andytaylor commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/2306#discussion_r217312411
--- Diff:
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
---
@@ -573,20 +575,38 @@ public void offerProducerCredit(final SimpleString
address,
final int threshold,
final Receiver receiver) {
try {
+ /*
+ * The credit runnable will always be run in this thread unless
the address or disc is full. If this is the case the
+ * runnable is run once the memory or disc is free, if this
happens we don't want to keep adding runnables as this
+ * may cause a memory leak, one is enough.
+ * */
+ if (creditRunnable != null && !creditRunnable.isRun())
+ return;
PagingManager pagingManager =
manager.getServer().getPagingManager();
- Runnable creditRunnable = () -> {
- connection.lock();
- try {
- if (receiver.getCredit() <= threshold) {
- int topUp = credits - receiver.getCredit();
- if (topUp > 0) {
- receiver.flow(topUp);
+ creditRunnable = new CreditRunnable() {
+ boolean isRun = false;
+ @Override
+ public boolean isRun() {
+ return isRun;
+ }
+
+ @Override
+ public void run() {
+ connection.lock();
+ try {
+ if (receiver.getCredit() <= threshold) {
+ int topUp = credits - receiver.getCredit();
+ System.out.println("topUp = " + topUp);
--- End diff --
my bad
---