This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 766e81d26e ARTEMIS-4505 Cleanup page transactions on startup of the 
broker
766e81d26e is described below

commit 766e81d26e222466e810c3d9d0608c2a939fe298
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Tue Nov 14 23:35:36 2023 -0500

    ARTEMIS-4505 Cleanup page transactions on startup of the broker
---
 .../artemis/core/paging/PageTransactionInfo.java   |   5 +
 .../core/paging/cursor/PageCursorProvider.java     |   2 +
 .../cursor/impl/PageCounterRebuildManager.java     |   7 +-
 .../paging/cursor/impl/PageCursorProviderImpl.java |  13 +-
 .../core/paging/impl/PageTransactionInfoImpl.java  |  15 +++
 .../core/paging/impl/PagingManagerImpl.java        |  87 ++++++++++++--
 .../artemis/core/server/ActiveMQServerLogger.java  |   9 ++
 .../core/server/impl/ActiveMQServerImpl.java       |   2 +-
 .../core/server/files/FileMoveManagerTest.java     |   2 +-
 .../paging/PageTransactionCleanupTest.java         | 133 +++++++++++++++++++++
 10 files changed, 258 insertions(+), 17 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
index e977c00ed0..9692e2b7a6 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PageTransactionInfo.java
@@ -73,4 +73,9 @@ public interface PageTransactionInfo extends EncodingSupport {
     */
    boolean deliverAfterCommit(PageIterator pageIterator, PageSubscription 
cursor, PagedReference pagedMessage);
 
+   /** Used on PageRebuildManager to cleanup orphaned Page Transactions */
+   boolean isOrphaned();
+
+   PageTransactionInfo setOrphaned(boolean orphaned);
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
index b42049fe55..526289aa00 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageCursorProvider.java
@@ -67,4 +67,6 @@ public interface PageCursorProvider {
 
    void counterRebuildDone();
 
+   boolean isRebuildDone();
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
index 62d40de398..e4cac16264 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
@@ -39,10 +39,12 @@ import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 
 /** this class will copy current data from the Subscriptions, count messages 
while the server is already active
  * performing other activity */
+// TODO: Rename this as RebuildManager in a future major version
 public class PageCounterRebuildManager implements Runnable {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -58,7 +60,7 @@ public class PageCounterRebuildManager implements Runnable {
    private final Set<Long> storedLargeMessages;
 
 
-   public PageCounterRebuildManager(PagingManager pagingManager, PagingStore 
store, Map<Long, PageTransactionInfo> transactions, Set<Long> 
storedLargeMessages) {
+   public PageCounterRebuildManager(PagingManager pagingManager, PagingStore 
store, Map<Long, PageTransactionInfo> transactions, Set<Long> 
storedLargeMessages, AtomicLong minPageTXIDFound) {
       // we make a copy of the data because we are allowing data to influx. We 
will consolidate the values at the end
       initialize(store);
       this.pagingManager = pagingManager;
@@ -253,6 +255,9 @@ public class PageCounterRebuildManager implements Runnable {
 
                if (msg.getTransactionID() > 0) {
                   txInfo = transactions.get(msg.getTransactionID());
+                  if (txInfo != null) {
+                     txInfo.setOrphaned(false);
+                  }
                }
 
                Transaction preparedTX = txInfo == null ? null : 
txInfo.getPreparedTransaction();
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index dea4cd5fbc..1cf6c1cd47 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -61,7 +61,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
 
    // We can't call cleanup before counters were rebuilt
    // as they will determine if a subscription is empty or not
-   protected volatile boolean countersRebuilt = true;
+   protected volatile boolean rebuildDone = true;
 
    protected final PagingStore pagingStore;
 
@@ -268,7 +268,7 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
 
    protected void cleanup() {
 
-      if (!countersRebuilt) {
+      if (!rebuildDone) {
          logger.debug("Counters were not rebuilt yet, cleanup has to be 
ignored on address {}", pagingStore != null ? pagingStore.getAddress() : 
"NULL");
          return;
       }
@@ -611,11 +611,16 @@ public class PageCursorProviderImpl implements 
PageCursorProvider {
 
    @Override
    public void counterRebuildStarted() {
-      this.countersRebuilt = false;
+      this.rebuildDone = false;
    }
 
    @Override
    public void counterRebuildDone() {
-      this.countersRebuilt = true;
+      this.rebuildDone = true;
+   }
+
+   @Override
+   public boolean isRebuildDone() {
+      return this.rebuildDone;
    }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
index 3bcdcf4d4f..2ec4d242a7 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageTransactionInfoImpl.java
@@ -67,11 +67,26 @@ public final class PageTransactionInfoImpl implements 
PageTransactionInfo {
 
    private List<LateDelivery> lateDeliveries;
 
+   /** To be used during by the RebuildManager.
+    *  When reading transactions not found transactions are marked as done. */
+   private boolean orphaned;
+
    public PageTransactionInfoImpl(final long transactionID) {
       this();
       this.transactionID = transactionID;
    }
 
+   @Override
+   public boolean isOrphaned() {
+      return orphaned;
+   }
+
+   @Override
+   public PageTransactionInfoImpl setOrphaned(boolean orphaned) {
+      this.orphaned = orphaned;
+      return this;
+   }
+
    public PageTransactionInfoImpl() {
    }
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
index 41773760be..b97c9448b3 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingManagerImpl.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.paging.impl;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -27,6 +28,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import io.netty.util.collection.LongObjectHashMap;
@@ -37,6 +40,7 @@ import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
 import 
org.apache.activemq.artemis.core.paging.cursor.impl.PageCounterRebuildManager;
 import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -48,6 +52,7 @@ import 
org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.lang.invoke.MethodHandles;
 import java.util.function.BiConsumer;
 
@@ -56,6 +61,8 @@ import static 
org.apache.activemq.artemis.core.server.files.FileStoreMonitor.Fil
 
 public final class PagingManagerImpl implements PagingManager {
 
+   private static final int PAGE_TX_CLEANUP_PRINT_LIMIT = 1000;
+
    private static final int ARTEMIS_PAGING_COUNTER_SNAPSHOT_INTERVAL = 
Integer.valueOf(System.getProperty("artemis.paging.counter.snapshot.interval", 
"60"));
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -76,6 +83,8 @@ public final class PagingManagerImpl implements PagingManager 
{
 
    private final HierarchicalRepository<AddressSettings> 
addressSettingsRepository;
 
+   private final ActiveMQServer server;
+
    private PagingStoreFactory pagingStoreFactory;
 
    private volatile boolean globalFull;
@@ -125,7 +134,8 @@ public final class PagingManagerImpl implements 
PagingManager {
                             final HierarchicalRepository<AddressSettings> 
addressSettingsRepository,
                             final long maxSize,
                             final long maxMessages,
-                            final SimpleString managementAddress) {
+                            final SimpleString managementAddress,
+                            final ActiveMQServer server) {
       pagingStoreFactory = pagingSPI;
       this.addressSettingsRepository = addressSettingsRepository;
       addressSettingsRepository.registerListener(this);
@@ -138,14 +148,16 @@ public final class PagingManagerImpl implements 
PagingManager {
       globalSizeMetric.setUnderCallback(() -> setGlobalFull(false));
       this.managerExecutor = pagingSPI.newExecutor();
       this.managementAddress = managementAddress;
+      this.server = server;
    }
 
    SizeAwareMetric getSizeAwareMetric() {
       return globalSizeMetric;
    }
 
-
-   /** To be used in tests only called through PagingManagerTestAccessor */
+   /**
+    * To be used in tests only called through PagingManagerTestAccessor
+    */
    void resetMaxSize(long maxSize, long maxMessages) {
       this.maxSize = maxSize;
       this.maxMessages = maxMessages;
@@ -164,13 +176,13 @@ public final class PagingManagerImpl implements 
PagingManager {
 
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
                             final HierarchicalRepository<AddressSettings> 
addressSettingsRepository) {
-      this(pagingSPI, addressSettingsRepository, -1, -1, null);
+      this(pagingSPI, addressSettingsRepository, -1, -1, null, null);
    }
 
    public PagingManagerImpl(final PagingStoreFactory pagingSPI,
                             final HierarchicalRepository<AddressSettings> 
addressSettingsRepository,
                             final SimpleString managementAddress) {
-      this(pagingSPI, addressSettingsRepository, -1, -1, managementAddress);
+      this(pagingSPI, addressSettingsRepository, -1, -1, managementAddress, 
null);
    }
 
    @Override
@@ -325,7 +337,6 @@ public final class PagingManagerImpl implements 
PagingManager {
       }
    }
 
-
    @Override
    public boolean isGlobalFull() {
       return diskFull || maxSize > 0 && globalFull;
@@ -513,7 +524,6 @@ public final class PagingManagerImpl implements 
PagingManager {
       }
    }
 
-
    @Override
    public synchronized void stop() throws Exception {
       if (!started) {
@@ -583,22 +593,79 @@ public final class PagingManagerImpl implements 
PagingManager {
    public Future<Object> rebuildCounters(Set<Long> storedLargeMessages) {
       Map<Long, PageTransactionInfo> transactionsSet = new LongObjectHashMap();
       // making a copy
-      transactions.forEach(transactionsSet::put);
+      transactions.forEach((a, b) -> {
+         transactionsSet.put(a, b);
+         b.setOrphaned(true);
+      });
+      AtomicLong minLargeMessageID = new AtomicLong(Long.MAX_VALUE);
+
+      // make a copy of the stores
+      Map<SimpleString, PagingStore> currentStoreMap = new HashMap<>();
+      stores.forEach(currentStoreMap::put);
 
       if (logger.isDebugEnabled()) {
          logger.debug("Page Transactions during rebuildCounters:");
          transactionsSet.forEach((a, b) -> logger.debug("{} = {}", a, b));
       }
 
-      stores.forEach((address, pgStore) -> {
-         PageCounterRebuildManager rebuildManager = new 
PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages);
+      currentStoreMap.forEach((address, pgStore) -> {
+         PageCounterRebuildManager rebuildManager = new 
PageCounterRebuildManager(this, pgStore, transactionsSet, storedLargeMessages, 
minLargeMessageID);
          logger.debug("Setting destination {} to rebuild counters", address);
          managerExecutor.execute(rebuildManager);
       });
 
+      managerExecutor.execute(() -> cleanupPageTransactions(transactionsSet, 
currentStoreMap));
+
       FutureTask<Object> task = new FutureTask<>(() -> null);
       managerExecutor.execute(task);
 
       return task;
    }
+
+   private void cleanupPageTransactions(Map<Long, PageTransactionInfo> 
transactionSet, Map<SimpleString, PagingStore> currentStoreMap) {
+      if (server == null) {
+         logger.warn("Server attribute was not set, cannot proceed with page 
transaction cleanup");
+      }
+      AtomicBoolean proceed = new AtomicBoolean(true);
+      
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+      // I'm now checking if all pagingStore have finished rebuilding the page 
counter.
+      // this would be only false if some exception happened on previous 
executions on the rebuild manager.
+      // so if any exception happened, the cleanup is not going to be done
+      currentStoreMap.forEach((a, b) -> {
+         if (!b.getCursorProvider().isRebuildDone()) {
+            logger.warn("cannot proceed on cleaning up page transactions as 
page cursor for {} is not done rebuilding it", b.getAddress());
+            proceed.set(false);
+         }
+      });
+
+      if (!proceed.get()) {
+         return;
+      }
+
+      AtomicLong txRemoved = new AtomicLong(0);
+
+      transactionSet.forEach((a, b) -> {
+         if (b.isOrphaned()) {
+            b.onUpdate(b.getNumberOfMessages(), server.getStorageManager(), 
this);
+            txRemoved.incrementAndGet();
+
+            // I'm pringing up to 1000 records, id by ID..
+            if (txRemoved.get() < PAGE_TX_CLEANUP_PRINT_LIMIT) {
+               ActiveMQServerLogger.LOGGER.removeOrphanedPageTransaction(a);
+            } else {
+               // after a while, I start just printing counters to speed up 
things a bit
+               if (txRemoved.get() % PAGE_TX_CLEANUP_PRINT_LIMIT == 0) {
+                  
ActiveMQServerLogger.LOGGER.cleaningOrphanedTXCleanup(txRemoved.get());
+               }
+
+            }
+         }
+      });
+
+      if (txRemoved.get() > 0) {
+         
ActiveMQServerLogger.LOGGER.completeOrphanedTXCleanup(txRemoved.get());
+      } else {
+         logger.debug("Complete cleanupPageTransactions with no orphaned 
records found");
+      }
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 51b1580dac..966661365b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1593,4 +1593,13 @@ public interface ActiveMQServerLogger {
 
    @LogMessage(id = 224130, value = "The {} value {} will override the {} 
value {} since both are set.", level = LogMessage.Level.INFO)
    void configParamOverride(String overridingParam, Object 
overridingParamValue, String overriddenParam, Object overriddenParamValue);
+
+   @LogMessage(id = 224131, value = "Removing orphaned page transaction {}", 
level = LogMessage.Level.INFO)
+   void removeOrphanedPageTransaction(long transactionID);
+
+   @LogMessage(id = 224132, value = "{} orphaned page transactions were 
removed", level = LogMessage.Level.INFO)
+   void completeOrphanedTXCleanup(long numberOfPageTx);
+
+   @LogMessage(id = 224133, value = "{} orphaned page transactions have been 
removed", level = LogMessage.Level.INFO)
+   void cleaningOrphanedTXCleanup(long numberOfPageTx);
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 82c88f4bf6..7f072eb1d4 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -3048,7 +3048,7 @@ public class ActiveMQServerImpl implements ActiveMQServer 
{
 
    @Override
    public PagingManager createPagingManager() throws Exception {
-      return new PagingManagerImpl(getPagingStoreFactory(), 
addressSettingsRepository, configuration.getGlobalMaxSize(), 
configuration.getGlobalMaxMessages(), configuration.getManagementAddress());
+      return new PagingManagerImpl(getPagingStoreFactory(), 
addressSettingsRepository, configuration.getGlobalMaxSize(), 
configuration.getGlobalMaxMessages(), configuration.getManagementAddress(), 
this);
    }
 
    protected PagingStoreFactory getPagingStoreFactory() throws Exception {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
index 0991420ac2..4cd01b4bf8 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/files/FileMoveManagerTest.java
@@ -307,7 +307,7 @@ public class FileMoveManagerTest {
 
             PagingStoreFactoryNIO storeFactory = new 
PagingStoreFactoryNIO(storageManager, dataLocation, 100, null, new 
OrderedExecutorFactory(threadPool),  new OrderedExecutorFactory(threadPool), 
true, null);
 
-            PagingManagerImpl managerImpl = new 
PagingManagerImpl(storeFactory, addressSettings, -1, -1, 
ActiveMQDefaultConfiguration.getDefaultManagementAddress());
+            PagingManagerImpl managerImpl = new 
PagingManagerImpl(storeFactory, addressSettings, -1, -1, 
ActiveMQDefaultConfiguration.getDefaultManagementAddress(), null);
 
             managerImpl.start();
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java
new file mode 100644
index 0000000000..5cf6c8b591
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageTransactionCleanupTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.paging.impl.PagingStoreImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A PagingOrderTest.
+ * <br>
+ * PagingTest has a lot of tests already. I decided to create a newer one more 
specialized on Ordering and counters
+ */
+public class PageTransactionCleanupTest extends ActiveMQTestBase {
+
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final int PAGE_MAX = 100 * 1024;
+
+   private static final int PAGE_SIZE = 10 * 1024;
+
+   static final SimpleString ADDRESS = new SimpleString("TestQueue");
+
+   @Test
+   public void testPageTXCleanup() throws Throwable {
+
+      Configuration config = 
createDefaultConfig(true).setJournalSyncNonTransactional(false);
+
+      ActiveMQServer server = createServer(true, config, PAGE_SIZE, PAGE_MAX, 
new HashMap<String, AddressSettings>());
+
+      server.start();
+
+      Queue queue1 = server.createQueue(new 
QueueConfiguration("test1").setRoutingType(RoutingType.ANYCAST));
+      Queue queue2 = server.createQueue(new 
QueueConfiguration("test2").setRoutingType(RoutingType.ANYCAST));
+
+      queue1.getPagingStore().startPaging();
+      queue2.getPagingStore().startPaging();
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("CORE", 
"tcp://localhost:61616");
+
+      final int NUMBER_OF_MESSAGES = 30;
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+         for (int producerID = 1; producerID <= 2; producerID++) {
+            MessageProducer producer = 
session.createProducer(session.createQueue("test" + producerID));
+            for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+               producer.send(session.createTextMessage("hello " + i));
+               session.commit();
+            }
+         }
+      }
+
+      PagingStoreImpl store = (PagingStoreImpl) queue1.getPagingStore();
+      File folder = store.getFolder();
+
+      server.stop();
+
+      for (String fileName : folder.list((dir, f) -> f.endsWith(".page"))) {
+         File fileToRemove = new File(folder, fileName);
+         fileToRemove.delete();
+         logger.debug("removing file {}", fileToRemove);
+      }
+
+      try (AssertionLoggerHandler handler = new AssertionLoggerHandler()) {
+         server.start();
+         Wait.assertTrue(() -> handler.findText("AMQ224132"));
+      }
+
+      
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(60_000);
+
+      HashMap<Integer, AtomicInteger> countedJournal = 
countJournal(server.getConfiguration());
+      Assert.assertEquals(NUMBER_OF_MESSAGES, countedJournal.get(35).get());
+
+      server.stop();
+      server.start();
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(true, 
Session.AUTO_ACKNOWLEDGE);
+         connection.start();
+         for (int producerID = 1; producerID <= 2; producerID++) {
+            MessageConsumer consumer = 
session.createConsumer(session.createQueue("test" + producerID));
+            for (int i = 0; i < (producerID == 1 ? 0 : NUMBER_OF_MESSAGES); 
i++) {
+               TextMessage message = (TextMessage) consumer.receive(5000);
+               Assert.assertNotNull("message not received on producer + " + 
producerID + ", message " + i, message);
+               Assert.assertEquals("could not find message " + i + " on 
producerID=" + producerID, "hello " + i, message.getText());
+               session.commit();
+            }
+            Assert.assertNull(consumer.receiveNoWait());
+            consumer.close();
+         }
+      }
+   }
+
+}

Reply via email to