This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new cedc050e03 ARTEMIS-4569 Blocked Producers will hold runnables until 
messages are consumed.
cedc050e03 is described below

commit cedc050e03abf67c2c1e0bfa3cabed8266fdf14f
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Tue Jan 16 12:14:28 2024 -0500

    ARTEMIS-4569 Blocked Producers will hold runnables until messages are 
consumed.
    
    When initially developed the expectation was that no more producers would 
keep connecting but in a scenario like this
    the consumers could actually give up and things will just accumulate on the 
server.
    
    We should cleanup these upon disconnect.
---
 .../artemis/utils/runnables/AtomicRunnable.java    |  41 +++++-
 .../artemis/utils/runnables/RunnableList.java      |  56 +++++++
 .../artemis/utils/runnables/RunnableListTest.java  | 135 +++++++++++++++++
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   6 +-
 .../amqp/broker/AMQPSessionCallbackTest.java       |   7 +-
 .../core/protocol/openwire/amq/AMQSession.java     | 108 +++++++-------
 .../activemq/artemis/core/paging/PagingStore.java  |   6 +-
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  37 +++--
 .../core/server/impl/ServerSessionImpl.java        |  12 +-
 .../artemis/tests/leak/MemoryAssertions.java       |  10 +-
 .../tests/leak/ProducerBlockedLeakTest.java        | 163 +++++++++++++++++++++
 .../storage/PersistMultiThreadTest.java            |   6 +-
 .../unit/core/paging/impl/PagingStoreImplTest.java |  12 +-
 13 files changed, 514 insertions(+), 85 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
index 9ec5e8f48d..aa2b7c1dea 100644
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java
@@ -18,9 +18,13 @@
 package org.apache.activemq.artemis.utils.runnables;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Consumer;
 
 public abstract class AtomicRunnable implements Runnable {
 
+   public static AtomicRunnable delegate(Runnable runnable) {
+      return new AtomicRunnableWithDelegate(runnable);
+   }
 
    public static AtomicRunnable checkAtomic(Runnable run) {
       if (run instanceof AtomicRunnable) {
@@ -30,6 +34,27 @@ public abstract class AtomicRunnable implements Runnable {
       }
    }
 
+   private RunnableList acceptedList;
+   private Consumer<AtomicRunnable> cancelTask;
+
+   public RunnableList getAcceptedList() {
+      return acceptedList;
+   }
+
+   public AtomicRunnable setAcceptedList(RunnableList acceptedList) {
+      this.acceptedList = acceptedList;
+      return this;
+   }
+
+   public Consumer<AtomicRunnable> getCancel() {
+      return cancelTask;
+   }
+
+   public AtomicRunnable setCancel(Consumer<AtomicRunnable> cancelTask) {
+      this.cancelTask = cancelTask;
+      return this;
+   }
+
    private volatile int ran;
 
    private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
@@ -52,7 +77,21 @@ public abstract class AtomicRunnable implements Runnable {
    @Override
    public void run() {
       if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
-         atomicRun();
+         try {
+            atomicRun();
+         } finally {
+            if (acceptedList != null) {
+               acceptedList.remove(AtomicRunnable.this);
+            }
+         }
+      }
+   }
+
+   public void cancel() {
+      if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
+         if (cancelTask != null) {
+            cancelTask.accept(AtomicRunnable.this);
+         }
       }
    }
 
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java
new file mode 100644
index 0000000000..a2bef5eec6
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+import java.util.HashSet;
+import java.util.function.Consumer;
+
+public class RunnableList {
+
+   private final HashSet<AtomicRunnable> list = new HashSet<>();
+
+   public RunnableList() {
+   }
+
+   public synchronized void add(AtomicRunnable runnable) {
+      runnable.setAcceptedList(this);
+      list.add(runnable);
+   }
+
+   public int size() {
+      return list.size();
+   }
+
+   public synchronized void remove(AtomicRunnable runnable) {
+      list.remove(runnable);
+   }
+
+   public synchronized void cancel() {
+      list.forEach(this::cancel);
+      list.clear();
+   }
+
+   private void cancel(AtomicRunnable atomicRunnable) {
+      atomicRunnable.cancel();
+   }
+
+   public void forEach(Consumer<AtomicRunnable> consumerRunnable) {
+      list.forEach(consumerRunnable);
+   }
+
+}
diff --git 
a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/runnables/RunnableListTest.java
 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/runnables/RunnableListTest.java
new file mode 100644
index 0000000000..8d21fd58d6
--- /dev/null
+++ 
b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/runnables/RunnableListTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.utils.runnables;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RunnableListTest {
+
+   HashSet<AtomicRunnable> masterList = new HashSet<>();
+
+   @Test
+   public void testRunning() {
+      AtomicInteger result = new AtomicInteger();
+
+      RunnableList listA = new RunnableList();
+      RunnableList listB = new RunnableList();
+      RunnableList listC = new RunnableList();
+
+      RunnableList[] lists = new RunnableList[]{listA, listB, listC};
+      for (RunnableList l : lists) {
+         for (int i = 0; i < 10; i++) {
+            AtomicRunnable runnable = new AtomicRunnable() {
+               @Override
+               public void atomicRun() {
+                  result.incrementAndGet();
+                  masterList.remove(this);
+               }
+            };
+            addItem(l, runnable);
+         }
+      }
+
+      Assert.assertEquals(30, masterList.size());
+
+      runList(listA);
+
+      Assert.assertEquals(10, result.get());
+
+      Assert.assertEquals(20, masterList.size());
+      Assert.assertEquals(0, listA.size());
+      Assert.assertEquals(10, listB.size());
+      Assert.assertEquals(10, listC.size());
+
+      HashSet<AtomicRunnable> copyList = new HashSet<>();
+      copyList.addAll(masterList);
+
+      copyList.forEach(r -> r.run());
+
+      for (RunnableList l : lists) {
+         Assert.assertEquals(0, l.size());
+      }
+
+      Assert.assertEquals(30, result.get());
+   }
+
+   @Test
+   public void testCancel() {
+      AtomicInteger result = new AtomicInteger();
+
+      RunnableList listA = new RunnableList();
+      RunnableList listB = new RunnableList();
+      RunnableList listC = new RunnableList();
+
+      RunnableList[] lists = new RunnableList[]{listA, listB, listC};
+      for (RunnableList l : lists) {
+         for (int i = 0; i < 10; i++) {
+            AtomicRunnable runnable = new AtomicRunnable() {
+               @Override
+               public void atomicRun() {
+                  result.incrementAndGet();
+                  masterList.remove(this);
+               }
+            };
+            addItem(l, runnable);
+         }
+      }
+
+      Assert.assertEquals(30, masterList.size());
+
+      listA.cancel();
+
+      Assert.assertEquals(0, result.get());
+
+      Assert.assertEquals(20, masterList.size());
+      Assert.assertEquals(0, listA.size());
+      Assert.assertEquals(10, listB.size());
+      Assert.assertEquals(10, listC.size());
+
+      listB.cancel();
+      listC.cancel();
+
+      for (RunnableList l : lists) {
+         Assert.assertEquals(0, l.size());
+      }
+
+      Assert.assertEquals(0, masterList.size());
+   }
+
+   // runs all AtomicRunnables inside the list
+   private void runList(RunnableList list) {
+      // make a copy of all the tasks to a new list
+      ArrayList<AtomicRunnable> toRun = new ArrayList<>();
+      list.forEach(toRun::add);
+
+      // run all the elements
+      toRun.forEach(r -> r.run());
+   }
+
+   private void addItem(RunnableList list, AtomicRunnable runnable) {
+      list.add(runnable);
+      runnable.setCancel(masterList::remove);
+      masterList.add(runnable);
+   }
+
+}
\ No newline at end of file
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 47941c8621..c8f081b30b 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -64,6 +64,7 @@ import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SelectorTranslator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.apache.activemq.artemis.utils.runnables.RunnableList;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -112,6 +113,8 @@ public class AMQPSessionCallback implements SessionCallback 
{
 
    private ProtonTransactionHandler transactionHandler;
 
+   private final RunnableList blockedRunnables = new RunnableList();
+
    public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
@@ -384,6 +387,7 @@ public class AMQPSessionCallback implements SessionCallback 
{
    }
 
    public void close() throws Exception {
+      blockedRunnables.cancel();
       //need to check here as this can be called if init fails
       if (serverSession != null) {
          // we cannot hold the nettyExecutor on this rollback here, otherwise 
other connections will be waiting
@@ -610,7 +614,7 @@ public class AMQPSessionCallback implements SessionCallback 
{
          } else {
             final PagingStore store = 
manager.getServer().getPagingManager().getPageStore(address);
             if (store != null) {
-               store.checkMemory(runnable);
+               store.checkMemory(runnable, blockedRunnables::add);
             } else {
                runnable.run();
             }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
index a902a26894..244a65baaf 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.protocol.amqp.broker;
 
 import java.util.concurrent.Executor;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagingManager;
@@ -157,7 +158,7 @@ public class AMQPSessionCallbackTest {
       session.flow(new SimpleString("test"), 
ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, 
AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
 
       // Run the credit refill code.
-      Mockito.verify(pagingStore).checkMemory(argument.capture());
+      Mockito.verify(pagingStore).checkMemory(argument.capture(), 
Mockito.isA(Consumer.class));
       assertNotNull(argument.getValue());
       argument.getValue().run();
 
@@ -188,7 +189,7 @@ public class AMQPSessionCallbackTest {
       session.flow(new SimpleString("test"), 
ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, 
AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
 
       // Run the credit refill code.
-      Mockito.verify(pagingStore).checkMemory(argument.capture());
+      Mockito.verify(pagingStore).checkMemory(argument.capture(), 
Mockito.isA(Consumer.class));
       assertNotNull(argument.getValue());
       argument.getValue().run();
 
@@ -249,7 +250,7 @@ public class AMQPSessionCallbackTest {
       session.flow(new SimpleString("test"), 
ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, 
receiver, connection));
 
       // Run the credit refill code.
-      Mockito.verify(pagingStore).checkMemory(argument.capture());
+      Mockito.verify(pagingStore).checkMemory(argument.capture(), 
Mockito.isA(Consumer.class));
       assertNotNull(argument.getValue());
       argument.getValue().run();
 
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 9c096fe545..1e267d27c8 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -51,6 +51,8 @@ import 
org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.IDGenerator;
 import org.apache.activemq.artemis.utils.SimpleIDGenerator;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
+import org.apache.activemq.artemis.utils.runnables.RunnableList;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerInfo;
@@ -78,6 +80,8 @@ public class AMQSession implements SessionCallback {
    private final ActiveMQServer server;
    private final OpenWireConnection connection;
 
+   private final RunnableList blockedRunnables = new RunnableList();
+
    private final AtomicBoolean started = new AtomicBoolean(false);
 
    private final ScheduledExecutorService scheduledPool;
@@ -320,8 +324,7 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public void closed() {
-      // TODO Auto-generated method stub
-
+      blockedRunnables.cancel();
    }
 
    @Override
@@ -338,6 +341,7 @@ public class AMQSession implements SessionCallback {
 
    @Override
    public void disconnect(ServerConsumer serverConsumer, String errorMessage) {
+      blockedRunnables.cancel();
       // for an openwire consumer this is fatal because unlike with activemq5 
sending
       // to the address will not auto create the consumer binding and it will 
be in limbo.
       // forcing disconnect allows it to failover and recreate its binding.
@@ -412,7 +416,7 @@ public class AMQSession implements SessionCallback {
             sendShouldBlockProducer(producerInfo, messageSend, 
sendProducerAck, store, dest, count, coreMsg, address);
          } else {
             if (store != null) {
-               if (!store.checkMemory(true, this::restoreAutoRead, 
this::blockConnection)) {
+               if (!store.checkMemory(true, 
AtomicRunnable.delegate(this::restoreAutoRead), 
AtomicRunnable.delegate(this::blockConnection), this.blockedRunnables::add)) {
                   restoreAutoRead();
                   throw new ResourceAllocationException("Queue is full " + 
address);
                }
@@ -440,62 +444,65 @@ public class AMQSession implements SessionCallback {
                                         final AtomicInteger count,
                                         final 
org.apache.activemq.artemis.api.core.Message coreMsg,
                                         final SimpleString address) throws 
ResourceAllocationException {
-      final Runnable task = () -> {
-         Exception exceptionToSend = null;
 
-         try {
-            getCoreSession().send(coreMsg, false, 
producerInfo.getProducerId().toString(), dest.isTemporary());
-         } catch (Exception e) {
-            logger.debug("Sending exception to the client", e);
-            exceptionToSend = e;
-         }
-         connection.enableTtl();
-         if (count == null || count.decrementAndGet() == 0) {
-            if (exceptionToSend != null) {
-               this.connection.getContext().setDontSendReponse(false);
-               connection.sendException(exceptionToSend);
-            } else {
-               server.getStorageManager().afterCompleteOperations(new 
IOCallback() {
-                  @Override
-                  public void done() {
-                     if (sendProducerAck) {
-                        try {
-                           ProducerAck ack = new 
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-                           connection.dispatchAsync(ack);
-                        } catch (Exception e) {
+      final AtomicRunnable task = new AtomicRunnable() {
+         @Override
+         public void atomicRun() {
+            Exception exceptionToSend = null;
+            try {
+               getCoreSession().send(coreMsg, false, 
producerInfo.getProducerId().toString(), dest.isTemporary());
+            } catch (Exception e) {
+               logger.debug("Sending exception to the client", e);
+               exceptionToSend = e;
+            }
+            connection.enableTtl();
+            if (count == null || count.decrementAndGet() == 0) {
+               if (exceptionToSend != null) {
+                  connection.getContext().setDontSendReponse(false);
+                  connection.sendException(exceptionToSend);
+               } else {
+                  server.getStorageManager().afterCompleteOperations(new 
IOCallback() {
+                     @Override
+                     public void done() {
+                        if (sendProducerAck) {
+                           try {
+                              ProducerAck ack = new 
ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+                              connection.dispatchAsync(ack);
+                           } catch (Exception e) {
+                              
connection.getContext().setDontSendReponse(false);
+                              logger.warn(e.getMessage(), e);
+                              connection.sendException(e);
+                           }
+                        } else {
                            connection.getContext().setDontSendReponse(false);
-                           logger.warn(e.getMessage(), e);
-                           connection.sendException(e);
+                           try {
+                              Response response = new Response();
+                              
response.setCorrelationId(messageSend.getCommandId());
+                              connection.dispatchAsync(response);
+                           } catch (Exception e) {
+                              logger.warn(e.getMessage(), e);
+                              connection.sendException(e);
+                           }
                         }
-                     } else {
-                        connection.getContext().setDontSendReponse(false);
+                     }
+
+                     @Override
+                     public void onError(int errorCode, String errorMessage) {
                         try {
-                           Response response = new Response();
-                           
response.setCorrelationId(messageSend.getCommandId());
-                           connection.dispatchAsync(response);
-                        } catch (Exception e) {
-                           logger.warn(e.getMessage(), e);
-                           connection.sendException(e);
+                           final IOException e = new IOException(errorMessage);
+                           logger.warn(errorMessage);
+                           connection.serviceException(e);
+                        } catch (Exception ex) {
+                           logger.debug(ex.getMessage(), ex);
                         }
                      }
-                  }
-
-                  @Override
-                  public void onError(int errorCode, String errorMessage) {
-                     try {
-                        final IOException e = new IOException(errorMessage);
-                        logger.warn(errorMessage);
-                        connection.serviceException(e);
-                     } catch (Exception ex) {
-                        logger.debug(ex.getMessage(), ex);
-                     }
-                  }
-               });
+                  });
+               }
             }
          }
       };
       if (store != null) {
-         if (!store.checkMemory(false, task, null)) {
+         if (!store.checkMemory(false, task, null, blockedRunnables::add)) {
             this.connection.getContext().setDontSendReponse(false);
             connection.enableTtl();
             throw new ResourceAllocationException("Queue is full " + address);
@@ -542,11 +549,12 @@ public class AMQSession implements SessionCallback {
    }
 
    public void close() throws Exception {
-      this.coreSession.close(false);
+      this.close(false);
    }
 
    @Override
    public void close(boolean failed) {
+      blockedRunnables.cancel();
       try {
          this.coreSession.close(failed);
       } catch (Exception bestEffort) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index b9221ba186..6ee69e07bd 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging;
 
 import java.io.File;
 import java.util.Collection;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.RefCountMessageListener;
@@ -35,6 +36,7 @@ import 
org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 
 /**
  * <p>
@@ -175,9 +177,9 @@ public interface PagingStore extends ActiveMQComponent, 
RefCountMessageListener
       addSize(size, false);
    }
 
-   boolean checkMemory(Runnable runnable);
+   boolean checkMemory(Runnable runnable, Consumer<AtomicRunnable> 
blockedCallback);
 
-   boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable 
runWhenBlocking);
+   boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable 
runWhenBlocking, Consumer<AtomicRunnable> blockedCallback);
 
    boolean isFull();
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index adbc1028a6..0fedf9a647 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -1030,16 +1030,25 @@ public class PagingStoreImpl implements PagingStore {
    }
 
    @Override
-   public boolean checkMemory(final Runnable runWhenAvailable) {
-      return checkMemory(true, runWhenAvailable, null);
+   public boolean checkMemory(final Runnable runWhenAvailable, 
Consumer<AtomicRunnable> blockedCallback) {
+      return checkMemory(true, runWhenAvailable, null, blockedCallback);
+   }
+
+   private void addToBlockList(AtomicRunnable atomicRunnable, 
Consumer<AtomicRunnable> accepted) {
+      onMemoryFreedRunnables.add(atomicRunnable);
+      atomicRunnable.setCancel(onMemoryFreedRunnables::remove);
+      if (accepted != null) {
+         accepted.accept(atomicRunnable);
+      }
    }
 
    @Override
-   public boolean checkMemory(boolean runOnFailure, final Runnable 
runWhenAvailable, Runnable runWhenBlocking) {
+   public boolean checkMemory(boolean runOnFailure, Runnable 
runWhenAvailableParameter, Runnable runWhenBlocking, Consumer<AtomicRunnable> 
blockedCallback) {
+      AtomicRunnable runWhenAvailable = 
AtomicRunnable.checkAtomic(runWhenAvailableParameter);
 
       if (blockedViaAddressControl) {
          if (runWhenAvailable != null) {
-            
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+            addToBlockList(runWhenAvailable, blockedCallback);
          }
          return false;
       }
@@ -1047,7 +1056,7 @@ public class PagingStoreImpl implements PagingStore {
       if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && 
(maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize || 
pagingManager.isDiskFull())) {
          if (isFull()) {
             if (runOnFailure && runWhenAvailable != null) {
-               
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+               addToBlockList(runWhenAvailable, blockedCallback);
             }
             return false;
          }
@@ -1057,8 +1066,7 @@ public class PagingStoreImpl implements PagingStore {
                runWhenBlocking.run();
             }
 
-            AtomicRunnable atomicRunWhenAvailable = 
AtomicRunnable.checkAtomic(runWhenAvailable);
-            onMemoryFreedRunnables.add(atomicRunWhenAvailable);
+            addToBlockList(runWhenAvailable, blockedCallback);
 
             // We check again to avoid a race condition where the size can 
come down just after the element
             // has been added, but the check to execute was done before the 
element was added
@@ -1066,7 +1074,8 @@ public class PagingStoreImpl implements PagingStore {
             // MUCH better performance in a highly concurrent environment
             if (!pagingManager.isGlobalFull() && !full) {
                // run it now
-               atomicRunWhenAvailable.run();
+               runWhenAvailable.run();
+               onMemoryFreedRunnables.remove(runWhenAvailable);
             } else {
                if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
                   pagingManager.addBlockedStore(this);
@@ -1122,13 +1131,11 @@ public class PagingStoreImpl implements PagingStore {
    @Override
    public boolean checkReleasedMemory() {
       if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && !full) 
{
-         if (!onMemoryFreedRunnables.isEmpty()) {
-            executor.execute(this::memoryReleased);
-            if (blocking) {
-               
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, getPageInfo());
-               blocking = false;
-               return true;
-            }
+         executor.execute(this::memoryReleased);
+         if (blocking) {
+            ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, 
getPageInfo());
+            blocking = false;
+            return true;
          }
       }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d2f7eba3bc..55cf83596a 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -106,6 +106,8 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.JsonLoader;
 import org.apache.activemq.artemis.utils.PrefixUtil;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
+import org.apache.activemq.artemis.utils.runnables.RunnableList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -140,6 +142,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
 
    protected final Map<Long, ServerConsumer> consumers = new 
ConcurrentHashMap<>();
 
+   private final RunnableList blockedRunnables = new RunnableList();
+
    protected final ServerProducers serverProducers;
 
    protected volatile Transaction tx;
@@ -391,6 +395,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
    }
 
    protected void doClose(final boolean failed) throws Exception {
+      blockedRunnables.cancel();
+
       if (callback != null) {
          callback.close(failed);
       }
@@ -2007,12 +2013,12 @@ public class ServerSessionImpl implements 
ServerSession, FailureListener {
 
       if (store == null) {
          callback.sendProducerCreditsMessage(credits, address);
-      } else if (!store.checkMemory(new Runnable() {
+      } else if (!store.checkMemory(new AtomicRunnable() {
          @Override
-         public void run() {
+         public void atomicRun() {
             callback.sendProducerCreditsMessage(credits, address);
          }
-      })) {
+      }, blockedRunnables::add)) {
          callback.sendProducerCreditsFailMessage(credits, address);
       }
    }
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
index 0345e1c7b2..937387241c 100644
--- 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
@@ -39,8 +39,12 @@ public class MemoryAssertions {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   /** most tests should have these as 0 after execution. */
    public static void basicMemoryAsserts() throws Exception {
+      basicMemoryAsserts(true);
+   }
+
+   /** most tests should have these as 0 after execution. */
+   public static void basicMemoryAsserts(boolean validateMessages) throws 
Exception {
       CheckLeak checkLeak = new CheckLeak();
       assertMemory(checkLeak, 0, OpenWireConnection.class.getName());
       assertMemory(checkLeak, 0, ProtonServerSenderContext.class.getName());
@@ -53,7 +57,9 @@ public class MemoryAssertions {
       assertMemory(checkLeak, 0, AMQPSessionContext.class.getName());
       assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());
       assertMemory(checkLeak, 0, RoutingContextImpl.class.getName());
-      assertMemory(checkLeak, 0, MessageReferenceImpl.class.getName());
+      if (validateMessages) {
+         assertMemory(checkLeak, 0, MessageReferenceImpl.class.getName());
+      }
    }
 
    public static void assertMemory(CheckLeak checkLeak, int maxExpected, 
String clazz) throws Exception {
diff --git 
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java
 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java
new file mode 100644
index 0000000000..3785a7705b
--- /dev/null
+++ 
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.leak;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.SpawnedVMSupport;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerBlockedLeakTest extends ActiveMQTestBase {
+
+   private static final int OK = 100;
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   private static final String QUEUE_NAME = "TEST_BLOCKED_QUEUE";
+
+   ActiveMQServer server;
+
+   @BeforeClass
+   public static void beforeClass() throws Exception {
+      Assume.assumeTrue(CheckLeak.isLoaded());
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      server = createServer(true, createDefaultConfig(1, true));
+      server.getConfiguration().getAddressSettings().clear();
+      server.getConfiguration().getAddressSettings().put("#", new 
AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK).setMaxSizeMessages(10));
+      server.start();
+   }
+
+   @Test
+   public void testOPENWIRE() throws Exception {
+      testBlocked("OPENWIRE");
+   }
+
+   @Test
+   public void testCORE() throws Exception {
+      testBlocked("CORE");
+   }
+
+   @Test
+   public void testAMQP() throws Exception {
+      testBlocked("AMQP");
+   }
+
+   private void testBlocked(String protocol) throws Exception {
+      testBody(protocol);
+      MemoryAssertions.basicMemoryAsserts(false);
+      Queue queue = server.locateQueue(QUEUE_NAME);
+      queue.deleteAllReferences();
+      MemoryAssertions.basicMemoryAsserts(true);
+      server.stop();
+   }
+
+   // separating the test into a sub-method just to allow removing local 
references
+   // so they would be gone when basicMemoryAsserts is called
+   private void testBody(String protocol) throws Exception {
+      try (AssertionLoggerHandler loggerHandler = new 
AssertionLoggerHandler()) {
+         AtomicInteger messagesSent = new AtomicInteger(0);
+
+         server.addAddressInfo(new 
AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
+         server.createQueue(new 
QueueConfiguration(QUEUE_NAME).setAddress(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+
+         // clients need to be disconnected while blocked. For that reason a 
new VM is being spawned
+         Process process = 
SpawnedVMSupport.spawnVM(ProducerBlockedLeakTest.class.getName(), protocol, 
"10");
+
+         // checking the logs that the destination is blocked...
+         Wait.assertTrue(() -> loggerHandler.findText("AMQ222183"), 5000, 10);
+
+         process.destroyForcibly();
+         Assert.assertTrue(process.waitFor(10, TimeUnit.SECONDS));
+
+         // Making sure there are no connections anywhere in Acceptors or 
RemotingService.
+         // Just to speed up the test especially in OpenWire
+         server.getRemotingService().getConnections().forEach(c -> c.fail(new 
ActiveMQException("this is it!")));
+         Wait.assertEquals(0, () -> 
server.getRemotingService().getConnectionCount());
+         server.getRemotingService().getAcceptors().forEach((a, b) -> {
+            if (b instanceof NettyAcceptor) {
+               ((NettyAcceptor) b).getConnections().clear();
+            }
+         });
+      }
+   }
+
+   public static void main(String[] arg) {
+      String protocol = arg[0];
+      int threads = Integer.parseInt(arg[1]);
+      ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, 
"tcp://localhost:61616");
+      ExecutorService executorService = Executors.newFixedThreadPool(threads);
+
+      for (int i = 0; i < threads; i++) {
+         executorService.execute(() -> {
+            try {
+               Connection connection = factory.createConnection();
+               Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+               MessageProducer producer = 
session.createProducer(session.createQueue(QUEUE_NAME));
+               for (int send = 0; send < 100; send++) {
+                  producer.send(session.createTextMessage("hello"));
+                  session.commit();
+               }
+            } catch (Exception e) {
+               logger.warn(e.getMessage(), e);
+               Runtime.getRuntime().halt(-1);
+            }
+         });
+      }
+      try {
+         while (true) {
+            Thread.sleep(1000);
+         }
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+         Runtime.getRuntime().halt(-1);
+      }
+   }
+
+}
\ No newline at end of file
diff --git 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 3ab5633355..49722908fa 100644
--- 
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++ 
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.Message;
@@ -44,6 +45,7 @@ import 
org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -493,12 +495,12 @@ public class PersistMultiThreadTest extends 
ActiveMQTestBase {
       }
 
       @Override
-      public boolean checkMemory(boolean runOnFailure, Runnable runnable, 
Runnable ignoredRunnable) {
+      public boolean checkMemory(boolean runOnFailure, Runnable runnable, 
Runnable ignoredRunnable, Consumer<AtomicRunnable> accepted) {
          return false;
       }
 
       @Override
-      public boolean checkMemory(Runnable runnable) {
+      public boolean checkMemory(Runnable runnable, Consumer<AtomicRunnable> 
accepted) {
          return false;
       }
 
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 013f241cce..3c0835fa3c 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -1252,11 +1252,11 @@ public class PagingStoreImplTest extends 
ActiveMQTestBase {
          };
          store.applySetting(new 
AddressSettings().setMaxSizeBytes(1000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
          store.addSize(100);
-         store.checkMemory(trackMemoryChecks);
+         store.checkMemory(trackMemoryChecks, null);
          assertEquals(1, calls.get());
 
          store.block();
-         store.checkMemory(trackMemoryChecks);
+         store.checkMemory(trackMemoryChecks, null);
          assertEquals(1, calls.get());
 
          store.unblock();
@@ -1272,7 +1272,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase 
{
          assertEquals(100, store.getAddressLimitPercent());
 
          // address full blocks
-         store.checkMemory(trackMemoryChecks);
+         store.checkMemory(trackMemoryChecks, null);
          assertEquals(2, calls.get());
 
          store.block();
@@ -1300,7 +1300,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase 
{
          store.addSize(900);
          assertEquals(100, store.getAddressLimitPercent());
 
-         store.checkMemory(trackMemoryChecks);
+         store.checkMemory(trackMemoryChecks, null);
          assertEquals("no change", 3, calls.get());
          assertEquals("no change to be sure to be sure!", 3, calls.get());
 
@@ -1493,7 +1493,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase 
{
          // Do an initial check
          final CountingRunnable trackMemoryCheck1 = new CountingRunnable();
          assertEquals(0, trackMemoryCheck1.getCount());
-         store.checkMemory(trackMemoryCheck1);
+         store.checkMemory(trackMemoryCheck1, null);
          assertEquals(1, trackMemoryCheck1.getCount());
 
          // Do another check, this time indicate the disk is full during the 
first couple
@@ -1501,7 +1501,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase 
{
          final CountingRunnable trackMemoryCheck2 = new CountingRunnable();
          Mockito.when(mockManager.isDiskFull()).thenReturn(true, true, false);
          assertEquals(0, trackMemoryCheck2.getCount());
-         store.checkMemory(trackMemoryCheck2);
+         store.checkMemory(trackMemoryCheck2, null);
          assertEquals(1, trackMemoryCheck2.getCount());
 
          // Now run the released memory checks. The task should NOT execute 
again, verify it doesnt.


Reply via email to