This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new bd2a7402cb ARTEMIS-5579 Address is unavailable to create queues during 
page cleanup
bd2a7402cb is described below

commit bd2a7402cbd8ea591b64507aa0fd7199583a0f70
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Jul 17 13:04:00 2025 -0400

    ARTEMIS-5579 Address is unavailable to create queues during page cleanup
---
 .../paging/cursor/impl/PageCursorProviderImpl.java |  83 +++++++--------
 .../artemis/tests/db/paging/PagingTest.java        | 118 +++++++++++++++++++++
 2 files changed, 156 insertions(+), 45 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index 050f3a1314..c52a127304 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -84,16 +84,13 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
          throw new IllegalStateException("Cursor " + cursorID + " had already 
been created");
       }
 
-
       PageSubscriptionCounter subscriptionCounter = 
createPageCounter(cursorID, persistent);
       PageSubscription activeCursor = new PageSubscriptionImpl(this, 
pagingStore, storageManager, filter, cursorID, persistent, subscriptionCounter);
 
-
       activeCursors.put(cursorID, activeCursor);
       return activeCursor;
    }
 
-
    private PageSubscriptionCounter createPageCounter(long cursorID, boolean 
persistent) {
       return new PageSubscriptionCounterImpl(storageManager, cursorID);
    }
@@ -272,7 +269,6 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
       // I tried to simplify the locks but each PageStore has its own lock, so 
this was the best option
       // I found in order to fix 
https://issues.apache.org/jira/browse/ARTEMIS-3054
       try (ArtemisCloseable readLock = storageManager.closeableReadLock()) {
-
          while (true) {
             if (pagingStore.writeLock(1_000)) {
                break;
@@ -283,57 +279,54 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
 
          logger.trace(">>>> Cleanup {}", this.pagingStore.getAddress());
 
-         synchronized (this) {
-            try {
-               if (!pagingStore.isStarted()) {
-                  logger.trace("Paging store is not started");
-                  return;
-               }
+         try {
+            if (!pagingStore.isStarted()) {
+               logger.trace("Paging store is not started");
+               return;
+            }
 
-               if (!pagingStore.isPaging()) {
-                  logger.trace("Paging Store was not paging, so no reason to 
retry the cleanup");
-                  return;
-               }
+            if (!pagingStore.isPaging()) {
+               logger.trace("Paging Store was not paging, so no reason to 
retry the cleanup");
+               return;
+            }
 
-               List<PageSubscription> cursorList = cloneSubscriptions();
+            List<PageSubscription> cursorList = cloneSubscriptions();
 
-               long minPage = checkMinPage(cursorList);
-               final long firstPage = pagingStore.getFirstPage();
-               deliverIfNecessary(cursorList, minPage);
+            long minPage = checkMinPage(cursorList);
+            final long firstPage = pagingStore.getFirstPage();
+            deliverIfNecessary(cursorList, minPage);
 
-               if (logger.isTraceEnabled()) {
-                  logger.trace("firstPage={}, minPage={}, 
currentWritingPage={}", firstPage, minPage, 
pagingStore.getCurrentWritingPage());
-               }
+            if (logger.isTraceEnabled()) {
+               logger.trace("firstPage={}, minPage={}, currentWritingPage={}", 
firstPage, minPage, pagingStore.getCurrentWritingPage());
+            }
 
-               // First we cleanup regular streaming, at the beginning of set 
of files
-               cleanupRegularStream(depagedPages, depagedPagesSet, cursorList, 
minPage, firstPage);
+            // First we cleanup regular streaming, at the beginning of set of 
files
+            cleanupRegularStream(depagedPages, depagedPagesSet, cursorList, 
minPage, firstPage);
 
-               // Then we do some check on eventual pages that can be already 
removed but they are away from the streaming
-               cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, 
minPage, firstPage);
+            // Then we do some check on eventual pages that can be already 
removed but they are away from the streaming
+            cleanupMiddleStream(depagedPages, depagedPagesSet, cursorList, 
minPage, firstPage);
 
-               if (pagingStore.isPageFull()) {
-                  checkClearPageLimit();
-               }
+            if (pagingStore.isPageFull()) {
+               checkClearPageLimit();
+            }
 
-               assert pagingStore.getNumberOfPages() >= 0;
+            assert pagingStore.getNumberOfPages() >= 0;
 
-               if (!pagingStore.hasPendingIO() && 
(pagingStore.getNumberOfPages() == 0 || pagingStore.getNumberOfPages() == 1 && 
(pagingStore.getCurrentPage() == null || 
pagingStore.getCurrentPage().getNumberOfMessages() == 0))) {
-                  logger.trace("StopPaging being called on {}, pending={}", 
pagingStore, pagingStore.hasPendingIO());
-                  pagingStore.stopPaging();
-               } else {
-                  if (logger.isTraceEnabled()) {
-                     logger.trace("Couldn't cleanup page on address {} as 
numberOfPages == {}  and currentPage.numberOfMessages = {}",
-                        pagingStore.getAddress(), 
pagingStore.getNumberOfPages(), 
pagingStore.getCurrentPage().getNumberOfMessages());
-                  }
+            if (!pagingStore.hasPendingIO() && (pagingStore.getNumberOfPages() 
== 0 || pagingStore.getNumberOfPages() == 1 && (pagingStore.getCurrentPage() == 
null || pagingStore.getCurrentPage().getNumberOfMessages() == 0))) {
+               logger.trace("StopPaging being called on {}, pending={}", 
pagingStore, pagingStore.hasPendingIO());
+               pagingStore.stopPaging();
+            } else {
+               if (logger.isTraceEnabled()) {
+                  logger.trace("Couldn't cleanup page on address {} as 
numberOfPages == {}  and currentPage.numberOfMessages = {}", 
pagingStore.getAddress(), pagingStore.getNumberOfPages(), 
pagingStore.getCurrentPage().getNumberOfMessages());
                }
-            } catch (Throwable ex) {
-               
ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(pagingStore.getAddress(),
 ex);
-               logger.warn(ex.getMessage(), ex);
-               return;
-            } finally {
-               logger.trace("<<<< Cleanup end on {}", 
pagingStore.getAddress());
-               pagingStore.writeUnlock();
             }
+         } catch (Throwable ex) {
+            
ActiveMQServerLogger.LOGGER.problemCleaningPageAddress(pagingStore.getAddress(),
 ex);
+            logger.warn(ex.getMessage(), ex);
+            return;
+         } finally {
+            logger.trace("<<<< Cleanup end on {}", pagingStore.getAddress());
+            pagingStore.writeUnlock();
          }
       }
       finishCleanup(depagedPages);
@@ -344,7 +337,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
     * This cleanup process will calculate the min page for every cursor and 
then we remove the pages based on that. if
     * we knew ahead all the queues belonging to every page we could remove 
this process.
     */
-   private void cleanupRegularStream(List<Page> depagedPages,
+   protected void cleanupRegularStream(List<Page> depagedPages,
                           LongHashSet depagedPagesSet,
                           List<PageSubscription> cursorList,
                           long minPage,
diff --git 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
index 5f8e057849..633086ece1 100644
--- 
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
+++ 
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
@@ -52,6 +52,7 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -96,6 +97,7 @@ import 
org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import 
org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderImpl;
 import 
org.apache.activemq.artemis.core.paging.cursor.impl.PageCursorProviderTestAccessor;
 import org.apache.activemq.artemis.core.paging.cursor.impl.PagePositionImpl;
+import org.apache.activemq.artemis.core.paging.impl.Page;
 import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryDatabase;
 import org.apache.activemq.artemis.core.paging.impl.PagingStoreFactoryNIO;
 import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
@@ -124,6 +126,7 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.Wait;
 import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.collections.LongHashSet;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
@@ -3827,6 +3830,121 @@ public class PagingTest extends ParameterDBTestBase {
 
    }
 
+   // The pages are complete, and this is simulating a scenario where the 
server crashed before deleting the pages.
+   @TestTemplate
+   public void testCreateQueueWhileCleanup() throws Exception {
+
+      Configuration config = createDefaultInVMConfig();
+
+      CountDownLatch letGoLatch = new CountDownLatch(1);
+      runAfter(letGoLatch::countDown);
+      CountDownLatch startFlag = new CountDownLatch(1);
+      class InterruptedCursorProvider extends PageCursorProviderImpl {
+
+         InterruptedCursorProvider(PagingStore pagingStore, StorageManager 
storageManager) {
+            super(pagingStore, storageManager);
+         }
+
+         @Override
+         protected void cleanupRegularStream(List<Page> depagedPages,
+                                             LongHashSet depagedPagesSet,
+                                             List<PageSubscription> cursorList,
+                                             long minPage,
+                                             long firstPage) throws Exception {
+            super.cleanupRegularStream(depagedPages, depagedPagesSet, 
cursorList, minPage, firstPage);
+            startFlag.countDown();
+            for (int i = 0; i < 60; i++) {
+               if (!letGoLatch.await(1, TimeUnit.SECONDS)) {
+                  logger.warn("letGoLatch still unreleased");
+               }
+            }
+         }
+      }
+
+      server = new ActiveMQServerImpl(config, 
ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
+         @Override
+         protected PagingStoreFactoryNIO getPagingStoreFactory() {
+            return new PagingStoreFactoryNIO(this.getStorageManager(), 
this.getConfiguration().getPagingLocation(), 
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(), 
this.getExecutorFactory(), 
this.getConfiguration().isJournalSyncNonTransactional(), null) {
+               @Override
+               public PageCursorProvider newCursorProvider(PagingStore store,
+                                                           StorageManager 
storageManager,
+                                                           AddressSettings 
addressSettings,
+                                                           ArtemisExecutor 
executor) {
+                  return new InterruptedCursorProvider(store, storageManager);
+               }
+            };
+         }
+
+      };
+
+      addServer(server);
+
+      AddressSettings defaultSetting = new 
AddressSettings().setPageSizeBytes(PagingTest.PAGE_SIZE).setMaxSizeBytes(0).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE).setMaxReadPageBytes(-1);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      server.start();
+
+      
locator.setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);
+
+      sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession(true, true, 0);
+
+      String queue1Name = "Queue1";
+      String queue2Name = "Queue2";
+
+
+      server.addAddressInfo(new 
AddressInfo(ADDRESS).addRoutingType(RoutingType.MULTICAST));
+
+      
session.createQueue(QueueConfiguration.of(queue1Name).setAddress(ADDRESS).setRoutingType(RoutingType.MULTICAST));
+
+      final Queue queue1 = server.locateQueue(queue1Name);
+
+      queue1.getPageSubscription().getPagingStore().startPaging();
+
+      ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+      ClientMessage message;
+
+      for (int i = 0; i < 20; i++) {
+         message = session.createMessage(true);
+
+         ActiveMQBuffer bodyLocal = message.getBodyBuffer();
+
+         bodyLocal.writeBytes(new byte[100 * 4]);
+
+         message.putIntProperty(SimpleString.of("idi"), i);
+
+         producer.send(message);
+         session.commit();
+      }
+
+      Wait.assertTrue(queue1.getPagingStore()::isPaging);
+      Future<Boolean> doneCleanup = 
queue1.getPagingStore().getCursorProvider().scheduleCleanup();
+
+      assertTrue(startFlag.await(5, TimeUnit.SECONDS));
+
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      runAfter(executorService::shutdownNow);
+      CountDownLatch created = new CountDownLatch(1);
+      executorService.execute(() -> {
+         try {
+            
server.createQueue(QueueConfiguration.of(queue2Name).setAddress(ADDRESS));
+            created.countDown();
+         } catch (Exception e) {
+            logger.warn(e.getMessage(), e);
+         }
+      });
+
+      assertTrue(created.await(5, TimeUnit.SECONDS));
+
+      assertNotNull(server.locateQueue(queue2Name));
+      letGoLatch.countDown();
+      assertTrue(doneCleanup.get(10, TimeUnit.SECONDS));
+
+      server.stop();
+   }
+
    @TestTemplate
    public void testPartialConsume() throws Exception {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to