Github user andytaylor commented on a diff in the pull request: https://github.com/apache/activemq-artemis/pull/2306#discussion_r217312411 --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java --- @@ -573,20 +575,38 @@ public void offerProducerCredit(final SimpleString address, final int threshold, final Receiver receiver) { try { + /* + * The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the + * runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this + * may cause a memory leak, one is enough. + * */ + if (creditRunnable != null && !creditRunnable.isRun()) + return; PagingManager pagingManager = manager.getServer().getPagingManager(); - Runnable creditRunnable = () -> { - connection.lock(); - try { - if (receiver.getCredit() <= threshold) { - int topUp = credits - receiver.getCredit(); - if (topUp > 0) { - receiver.flow(topUp); + creditRunnable = new CreditRunnable() { + boolean isRun = false; + @Override + public boolean isRun() { + return isRun; + } + + @Override + public void run() { + connection.lock(); + try { + if (receiver.getCredit() <= threshold) { + int topUp = credits - receiver.getCredit(); + System.out.println("topUp = " + topUp); --- End diff -- my bad
---