This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 677bb96641 ARTEMIS-5608 Option to purge paging folders during page
cleanup
677bb96641 is described below
commit 677bb96641a3a1373a4cca67cd3b57db01263097
Author: Clebert Suconic <[email protected]>
AuthorDate: Thu Jul 31 18:00:04 2025 -0400
ARTEMIS-5608 Option to purge paging folders during page cleanup
---
.../activemq/artemis/cli/commands/Create.java | 12 +
.../artemis/cli/commands/helper/HelperCreate.java | 14 ++
.../activemq/artemis/cli/commands/etc/broker.xml | 3 +
.../api/config/ActiveMQDefaultConfiguration.java | 6 +-
.../core/io/AbstractSequentialFileFactory.java | 12 +-
.../artemis/core/io/SequentialFileFactory.java | 4 +
.../artemis/core/config/Configuration.java | 7 +
.../core/config/impl/ConfigurationImpl.java | 13 +
.../deployers/impl/FileConfigurationParser.java | 2 +
.../core/management/impl/AddressControlImpl.java | 2 +-
.../activemq/artemis/core/paging/PagingStore.java | 13 +
.../core/paging/cursor/PageSubscription.java | 4 +
.../cursor/impl/PageCounterRebuildManager.java | 2 +-
.../paging/cursor/impl/PageCursorProviderImpl.java | 2 +-
.../paging/cursor/impl/PageSubscriptionImpl.java | 74 +++---
.../artemis/core/paging/impl/PageCache.java | 4 +
.../core/paging/impl/PagingStoreFactoryNIO.java | 20 +-
.../artemis/core/paging/impl/PagingStoreImpl.java | 122 +++++++++-
.../core/postoffice/impl/PostOfficeImpl.java | 2 +-
.../artemis/core/server/ActiveMQServerLogger.java | 6 +
.../core/server/impl/ActiveMQServerImpl.java | 3 +-
.../artemis/core/server/impl/QueueImpl.java | 8 +-
.../resources/schema/artemis-configuration.xsd | 9 +
.../core/config/impl/ConfigurationImplTest.java | 1 +
.../config/impl/FileConfigurationParserTest.java | 31 +++
.../management/impl/ManagementServiceImplTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
.../activemq/artemis/utils/TestParameters.java | 14 +-
.../activemq/artemis/tests/db/common/Database.java | 4 +-
.../artemis/tests/db/paging/PagingTest.java | 42 ++--
.../failover/PrimaryCrashOnBackupSyncTest.java | 4 +-
.../failover/ReplicatedPagedFailoverTest.java | 52 ++++
.../integration/paging/PageCleanupFolderTest.java | 159 +++++++++++++
.../storage/PersistMultiThreadTest.java | 5 +
.../resources/servers/horizontalPaging/broker.xml | 264 ---------------------
.../tests/soak/paging/HorizontalPagingTest.java | 91 ++++---
.../core/paging/impl/PageTimedWriterUnitTest.java | 2 +-
37 files changed, 646 insertions(+), 369 deletions(-)
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
index 9235cd473d..55028957f3 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/Create.java
@@ -156,6 +156,8 @@ public class Create extends InstallAbstract {
@Option(names = "--force", description = "Overwrite configuration at
destination directory.")
private boolean force;
+ @Option(names = "--purge-page-folders", description = "Paged folders should
be removed upon leaving page state.")
+ private boolean purgePageFolders;
@Option(names = "--clustered", description = "Enable clustering.")
private boolean clustered = false;
@@ -360,6 +362,14 @@ public class Create extends InstallAbstract {
return host;
}
+ public boolean isPurgePageFolders() {
+ return purgePageFolders;
+ }
+
+ public void setPurgePageFolders(boolean purgePageFolders) {
+ this.purgePageFolders = purgePageFolders;
+ }
+
public void setHost(String host) {
this.host = host;
}
@@ -601,6 +611,8 @@ public class Create extends InstallAbstract {
filters.put("${persistence-enabled}", isDisablePersistence() ? "false" :
"true");
+ filters.put("${purge-page-folders}", isPurgePageFolders() ? "true" :
"false");
+
if (ping != null && !ping.isEmpty()) {
filters.put("${ping}", ping);
filters.put("${ping-config.settings}", readTextFile(ETC_PING_TXT,
filters));
diff --git
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/helper/HelperCreate.java
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/helper/HelperCreate.java
index 3f59777dbb..3bd744fb06 100644
---
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/helper/HelperCreate.java
+++
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/helper/HelperCreate.java
@@ -74,6 +74,8 @@ public class HelperCreate extends HelperBase {
private String staticCluster;
+ private boolean purgePageFolders;
+
String dataFolder = "./data";
private boolean failoverOnShutdown = false;
@@ -136,6 +138,14 @@ public class HelperCreate extends HelperBase {
return this;
}
+ public boolean isPurgePageFolders() {
+ return purgePageFolders;
+ }
+
+ public void setPurgePageFolders(boolean purgePageFolders) {
+ this.purgePageFolders = purgePageFolders;
+ }
+
public int getPortOffset() {
return portOffset;
}
@@ -286,6 +296,10 @@ public class HelperCreate extends HelperBase {
add(listCommands, "--require-login");
}
+ if (purgePageFolders) {
+ add(listCommands, "--purge-page-folders");
+ }
+
if (staticCluster != null) {
add(listCommands, "--static-cluster", staticCluster);
}
diff --git
a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
index f032756064..b0659a995e 100644
---
a/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
+++
b/artemis-cli/src/main/resources/org/apache/activemq/artemis/cli/commands/etc/broker.xml
@@ -42,6 +42,9 @@ ${jdbc}
-->
<journal-type>${journal.settings}</journal-type>
+ <!-- Set this attribute to true to enable the system to purge the paging
folder once it exits the paging state -->
+ <purge-page-folders>${purge-page-folders}</purge-page-folders>
+
<paging-directory>${data.dir}/paging</paging-directory>
<bindings-directory>${data.dir}/bindings</bindings-directory>
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 18e3f17b91..4f71d57233 100644
---
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -717,10 +717,10 @@ public final class ActiveMQDefaultConfiguration {
private static final boolean DEFAULT_MIRROR_ACK_MANAGER_WARN_UNACKED =
false;
private static final boolean DEFAULT_MIRROR_PAGE_TRANSACTION = false;
+ private static final boolean DEFAULT_PURGE_PAGE_FOLDERS = false;
private static final int DEFAULT_CLUSTER_TOPOLOGY_SCANNER_ATTEMPTS = 30;
-
/**
* If {@code true} then the ActiveMQ Artemis Server will make use of any
Protocol Managers that are in available on
* the classpath. If false then only the core protocol will be available,
unless in Embedded mode where users can
@@ -2013,6 +2013,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_MIRROR_PAGE_TRANSACTION;
}
+ public static boolean getPurgePageFolders() {
+ return DEFAULT_PURGE_PAGE_FOLDERS;
+ }
+
/**
* the initial size of the intermediate message buffer used for queues
*/
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
index 046f73b667..a360c767be 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/AbstractSequentialFileFactory.java
@@ -244,7 +244,13 @@ public abstract class AbstractSequentialFileFactory
implements SequentialFileFac
@Override
public List<String> listFiles(final String extension) throws Exception {
- FilenameFilter fnf = (file, name) -> name.endsWith("." + extension);
+ FilenameFilter fnf;
+
+ if (extension != null) {
+ fnf = (file, name) -> name.endsWith("." + extension);
+ } else {
+ fnf = null;
+ }
String[] fileNames = journalDir.list(fnf);
@@ -255,4 +261,8 @@ public abstract class AbstractSequentialFileFactory
implements SequentialFileFac
return Arrays.asList(fileNames);
}
+ @Override
+ public boolean deleteFolder() {
+ return this.journalDir.delete();
+ }
}
diff --git
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
index 390339ca40..c999d936f4 100644
---
a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
+++
b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/SequentialFileFactory.java
@@ -131,6 +131,10 @@ public interface SequentialFileFactory {
void stop();
+ default boolean deleteFolder() {
+ return false;
+ }
+
/**
* Creates the directory if it does not exist yet.
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index 3526d0af76..1effd74b11 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -1526,6 +1526,13 @@ public interface Configuration {
boolean isMirrorAckManagerWarnUnacked();
+ /**
+ * Should the system remove page folders once destinations stop paging.
+ * Default is false, however future major versions will have this as true
*/
+ Configuration setPurgePageFolders(boolean purgePageFolders);
+
+ boolean isPurgePageFolders();
+
void exportAsProperties(File to) throws Exception;
default boolean isUsingDatabasePersistence() {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index c5e89107e2..6c5738a13b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -472,6 +472,8 @@ public class ConfigurationImpl implements Configuration,
Serializable {
private int globalMaxSizePercentOfJvmMaxMemory =
ActiveMQDefaultConfiguration.DEFAULT_GLOBAL_MAX_MEMORY_PERCENT;
+ private boolean purgePageFolders =
ActiveMQDefaultConfiguration.getPurgePageFolders();
+
/**
* Parent folder for all data folders.
*/
@@ -3397,6 +3399,17 @@ public class ConfigurationImpl implements Configuration,
Serializable {
return this;
}
+ @Override
+ public ConfigurationImpl setPurgePageFolders(boolean purgePageFolders) {
+ this.purgePageFolders = purgePageFolders;
+ return this;
+ }
+
+ @Override
+ public boolean isPurgePageFolders() {
+ return purgePageFolders;
+ }
+
@Override
public int getMirrorAckManagerPageAttempts() {
return this.mirrorAckManagerPageAttempts;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index fb84d04067..264be223aa 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -764,6 +764,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
config.setPagingDirectory(getString(e, "paging-directory",
config.getPagingDirectory(), NOT_NULL_OR_EMPTY));
+ config.setPurgePageFolders(getBoolean(e, "purge-page-folders",
config.isPurgePageFolders()));
+
config.setCreateJournalDir(getBoolean(e, "create-journal-dir",
config.isCreateJournalDir()));
String s = getString(e, "journal-type",
config.getJournalType().toString(), JOURNAL_TYPE);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
index 4c28a14315..4a8ade272d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/AddressControlImpl.java
@@ -490,7 +490,7 @@ public class AddressControlImpl extends AbstractControl
implements AddressContro
try {
final PagingStore pageStore = getPagingStore();
- if (pageStore == null || !pageStore.isPaging()) {
+ if (pageStore == null || !pageStore.isStorePaging()) {
return 0;
} else {
return pageStore.getNumberOfPages();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
index 29f5a5faf7..ba1b50ef26 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging;
import java.io.File;
import java.util.Collection;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -115,6 +116,11 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
*/
boolean isPaging();
+ /**
+ * isPaging might return true for example if it's DROPPING.
+ * This will only return true if there are files on the page system */
+ boolean isStorePaging();
+
/**
* Performs a real sync on the current IO file.
*/
@@ -179,8 +185,15 @@ public interface PagingStore extends ActiveMQComponent,
RefCountMessageListener
*/
boolean startPaging() throws Exception;
+ default boolean startPaging(long timeout, TimeUnit unit) throws Exception {
+ return startPaging();
+ }
+
void stopPaging() throws Exception;
+ default void purgeFolder() {
+ }
+
/**
* Add size to this {@code PageStore}.
*
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
index c41c964730..57fe49ab6b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PageSubscription.java
@@ -38,6 +38,8 @@ public interface PageSubscription {
*/
void counterSnapshot();
+ void deleteCursorInfo();
+
/**
* This is a callback to inform the PageSubscription that something was
routed, so the empty flag can be cleared
*/
@@ -62,6 +64,8 @@ public interface PageSubscription {
*/
boolean isPaging();
+ boolean isStorePaging();
+
PageIterator iterator();
PageIterator iterator(boolean browsing);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
index de0bd19e09..5b0c09a99e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCounterRebuildManager.java
@@ -79,7 +79,7 @@ public class PageCounterRebuildManager implements Runnable {
store.writeLock();
try {
try {
- paging = store.isPaging();
+ paging = store.isStorePaging();
if (!paging) {
logger.trace("Destination {} was not paging, no need to rebuild
counters", store.getAddress());
store.getCursorProvider().forEachSubscription(subscription -> {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
index c52a127304..47c44ea6a6 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageCursorProviderImpl.java
@@ -285,7 +285,7 @@ public class PageCursorProviderImpl implements
PageCursorProvider {
return;
}
- if (!pagingStore.isPaging()) {
+ if (!pagingStore.isStorePaging()) {
logger.trace("Paging Store was not paging, so no reason to
retry the cleanup");
return;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
index 386e7bbbdc..59128979c0 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java
@@ -141,6 +141,11 @@ public final class PageSubscriptionImpl implements
PageSubscription {
return pageStore.isPaging();
}
+ @Override
+ public boolean isStorePaging() {
+ return pageStore.isStorePaging();
+ }
+
@Override
public void setQueue(Queue queue) {
this.queue = queue;
@@ -720,6 +725,37 @@ public final class PageSubscriptionImpl implements
PageSubscription {
}
}
+ @Override
+ public void deleteCursorInfo() {
+ logger.debug("Delete consumed Paged on {} with consumedPages size = {}",
pageStore.getAddress(), consumedPages.size());
+ consumedPages.forEach(this::deleteCursorInfo);
+ consumedPages.clear();
+ }
+
+ private void deleteCursorInfo(Long page, PageCursorInfo info) {
+ logger.debug("Deleting cursorInfo for page {} info={}, address={}",
page, info, pageStore.getAddress());
+ PagePosition completeInfo = info.getCompleteInfo();
+ if (completeInfo != null) {
+ try {
+ store.deletePageComplete(completeInfo.getRecordID());
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.errorDeletingPageCompleteRecord(e);
+ }
+ info.setCompleteInfo(null);
+ }
+ if (info.acks != null) {
+ for (PagePosition deleteInfo : info.acks.values()) {
+ if (deleteInfo.getRecordID() >= 0) {
+ try {
+ store.deleteCursorAcknowledge(deleteInfo.getRecordID());
+ } catch (Exception e) {
+
ActiveMQServerLogger.LOGGER.errorDeletingPageCompleteRecord(e);
+ }
+ }
+ }
+ }
+ }
+
@Override
public void onDeletePage(Page deletedPage) throws Exception {
logger.debug("removing page {}", deletedPage);
@@ -728,26 +764,7 @@ public final class PageSubscriptionImpl implements
PageSubscription {
info = consumedPages.remove(deletedPage.getPageId());
}
if (info != null) {
- PagePosition completeInfo = info.getCompleteInfo();
- if (completeInfo != null) {
- try {
- store.deletePageComplete(completeInfo.getRecordID());
- } catch (Exception e) {
- ActiveMQServerLogger.LOGGER.errorDeletingPageCompleteRecord(e);
- }
- info.setCompleteInfo(null);
- }
- if (info.acks != null) {
- for (PagePosition deleteInfo : info.acks.values()) {
- if (deleteInfo.getRecordID() >= 0) {
- try {
- store.deleteCursorAcknowledge(deleteInfo.getRecordID());
- } catch (Exception e) {
-
ActiveMQServerLogger.LOGGER.errorDeletingPageCompleteRecord(e);
- }
- }
- }
- }
+ deleteCursorInfo(deletedPage.getPageId(), info);
}
deletedPage.usageExhaust();
}
@@ -935,17 +952,13 @@ public final class PageSubscriptionImpl implements
PageSubscription {
" numberOfMessage = " +
numberOfMessages +
", confirmed = " +
- confirmed +
- ", isDone=" +
- this.isDone();
+ confirmed;
} catch (Exception e) {
return "PageCursorInfo::pageNr=" + pageId +
" numberOfMessage = " +
numberOfMessages +
", confirmed = " +
- confirmed +
- ", isDone=" +
- e.toString();
+ confirmed + ", exception = " + e;
}
}
@@ -990,6 +1003,10 @@ public final class PageSubscriptionImpl implements
PageSubscription {
PageSubscriptionImpl.this, pageId, completePage != null,
getNumberOfMessages(), confirmed.get(), pendingTX.get());
}
+ if (!pageStore.isStorePaging()) {
+ return true;
+ }
+
// in cases where the file was damaged it is possible to get more
confirmed records than we actually had messages
// for that case we set confirmed.get() >= getNumberOfMessages
instead of ==
return completePage != null || (confirmed.get() >=
getNumberOfMessages() && pendingTX.get() == 0);
@@ -1081,6 +1098,9 @@ public final class PageSubscriptionImpl implements
PageSubscription {
}
private int getNumberOfMessages() {
+ if (!pageStore.isStorePaging()) {
+ return 0;
+ }
if (numberOfMessages < 0) {
try {
Page page = pageStore.usePage(pageId, true, false);
@@ -1408,7 +1428,7 @@ public final class PageSubscriptionImpl implements
PageSubscription {
return NextResult.hasElements;
}
- if (!pageStore.isPaging()) {
+ if (!pageStore.isStorePaging()) {
return NextResult.noElements;
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageCache.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageCache.java
index 84990bd8df..6b2155fe17 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageCache.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PageCache.java
@@ -68,5 +68,9 @@ public class PageCache {
}
}
+ public synchronized void clear() {
+ usedPages.clear();
+ }
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
index 20dbf7b5c7..bbdbe7c60e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreFactoryNIO.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
@@ -74,6 +75,8 @@ public class PagingStoreFactoryNIO implements
PagingStoreFactory {
private final IOCriticalErrorListener critialErrorListener;
+ private final Supplier<Boolean> purgePageFolders;
+
public File getDirectory() {
return directory;
}
@@ -109,6 +112,17 @@ public class PagingStoreFactoryNIO implements
PagingStoreFactory {
final ExecutorFactory executorFactory,
final boolean syncNonTransactional,
final IOCriticalErrorListener
critialErrorListener) {
+ this(storageManager, directory, syncTimeout, scheduledExecutor,
executorFactory, syncNonTransactional, critialErrorListener, () -> false);
+ }
+
+ public PagingStoreFactoryNIO(final StorageManager storageManager,
+ final File directory,
+ final long syncTimeout,
+ final ScheduledExecutorService
scheduledExecutor,
+ final ExecutorFactory executorFactory,
+ final boolean syncNonTransactional,
+ final IOCriticalErrorListener
critialErrorListener,
+ final Supplier<Boolean> purgePageFolders) {
this.storageManager = storageManager;
this.directory = directory;
this.executorFactory = executorFactory;
@@ -116,6 +130,7 @@ public class PagingStoreFactoryNIO implements
PagingStoreFactory {
this.scheduledExecutor = scheduledExecutor;
this.syncTimeout = syncTimeout;
this.critialErrorListener = critialErrorListener;
+ this.purgePageFolders = purgePageFolders;
}
@@ -148,8 +163,7 @@ public class PagingStoreFactoryNIO implements
PagingStoreFactory {
@Override
public synchronized PagingStore newStore(final SimpleString address, final
AddressSettings settings) {
-
- return new PagingStoreImpl(address, scheduledExecutor, syncTimeout,
pagingManager, storageManager, null, this, address, settings,
executorFactory.getExecutor().setFair(true), syncNonTransactional);
+ return new PagingStoreImpl(address, scheduledExecutor, syncTimeout,
pagingManager, storageManager, null, this, address, settings,
executorFactory.getExecutor().setFair(true), syncNonTransactional,
purgePageFolders);
}
@Override
@@ -226,7 +240,7 @@ public class PagingStoreFactoryNIO implements
PagingStoreFactory {
AddressSettings settings =
addressSettingsRepository.getMatch(address.toString());
- PagingStore store = new PagingStoreImpl(address,
scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this,
address, settings, executorFactory.getExecutor(), syncNonTransactional);
+ PagingStore store = new PagingStoreImpl(address,
scheduledExecutor, syncTimeout, pagingManager, storageManager, factory, this,
address, settings, executorFactory.getExecutor(), syncNonTransactional,
purgePageFolders);
storesReturn.add(store);
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 6079410c62..6b3f2e76c8 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -70,6 +70,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.function.Function;
+import java.util.function.Supplier;
/**
* @see PagingStore
@@ -170,6 +171,10 @@ public class PagingStoreImpl implements PagingStore {
private long rejectThreshold;
+ private final Supplier<Boolean> purgePageFolder;
+
+ private final ScheduledExecutorService scheduledExecutorService;
+
public PagingStoreImpl(final SimpleString address,
final ScheduledExecutorService scheduledExecutor,
final long syncTimeout,
@@ -181,6 +186,24 @@ public class PagingStoreImpl implements PagingStore {
final AddressSettings addressSettings,
final ArtemisExecutor executor,
final boolean syncNonTransactional) {
+ this(address, scheduledExecutor, syncTimeout, pagingManager,
+ storageManager, fileFactory, storeFactory,
+ storeName, addressSettings, executor, syncNonTransactional,
+ () -> false);
+ }
+
+ public PagingStoreImpl(final SimpleString address,
+ final ScheduledExecutorService scheduledExecutor,
+ final long syncTimeout,
+ final PagingManager pagingManager,
+ final StorageManager storageManager,
+ final SequentialFileFactory fileFactory,
+ final PagingStoreFactory storeFactory,
+ final SimpleString storeName,
+ final AddressSettings addressSettings,
+ final ArtemisExecutor executor,
+ final boolean syncNonTransactional,
+ final Supplier<Boolean> purgePageFolder) {
Objects.requireNonNull(scheduledExecutor, "scheduledExecutor = null");
Objects.requireNonNull(pagingManager, "Paging Manager can't be null");
@@ -203,6 +226,8 @@ public class PagingStoreImpl implements PagingStore {
this.fileFactory = fileFactory;
+ this.purgePageFolder = purgePageFolder;
+
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
@@ -212,6 +237,8 @@ public class PagingStoreImpl implements PagingStore {
this.cursorProvider = storeFactory.newCursorProvider(this,
this.storageManager, addressSettings, executor);
this.usingGlobalMaxSize = pagingManager.isUsingGlobalSize();
+
+ this.scheduledExecutorService = scheduledExecutor;
}
// This is an extension point for unit tests to replace the creation of the
PagedTimeWriter
@@ -570,6 +597,11 @@ public class PagingStoreImpl implements PagingStore {
return fileFactory.getDirectoryName();
}
+ @Override
+ public boolean isStorePaging() {
+ return paging;
+ }
+
@Override
public boolean isPaging() {
AddressFullMessagePolicy policy = this.addressFullMessagePolicy;
@@ -810,11 +842,47 @@ public class PagingStoreImpl implements PagingStore {
pageLimitReleased();
}
this.cursorProvider.onPageModeCleared();
+ if (purgePageFolder.get()) {
+ execute(this::purgeFolder);
+ }
} finally {
writeUnlock();
}
}
+ @Override
+ public void purgeFolder() {
+ writeLock();
+ try {
+ if (!isStorePaging() && !hasPendingIO() && fileFactory != null) {
+
ActiveMQServerLogger.LOGGER.purgingPageFolder(fileFactory.getDirectoryName(),
storeName);
+ // closing used pages...
+ // all files need to be closed before we can remove a folder
+ usedPages.forEachUsedPage(this::closePage);
+ usedPages.clear();
+ closePage(currentPage);
+ currentPage = null;
+ numberOfPages = 0;
+ if (deleteFolder()) {
+ // we delete the subscription's journal information after the
folder is removed
+
cursorProvider.forEachSubscription(PageSubscription::deleteCursorInfo);
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void closePage(Page p) {
+ try {
+ if (p != null) {
+ p.close(true);
+ }
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
private String getPageInfo() {
return String.format("size=%d bytes (%d messages); maxSize=%d bytes (%d
messages); globalSize=%d bytes (%d messages); globalMaxSize=%d bytes (%d
messages);", size.getSize(), size.getElements(), maxSize, maxMessages,
pagingManager.getGlobalSize(), pagingManager.getGlobalMessages(),
pagingManager.getMaxSize(), pagingManager.getMaxMessages());
}
@@ -880,6 +948,9 @@ public class PagingStoreImpl implements PagingStore {
@Override
public boolean checkPageFileExists(final long pageNumber) {
+ if (fileFactory == null) {
+ return false;
+ }
String fileName = createFileName(pageNumber);
SequentialFileFactory factory = null;
@@ -920,6 +991,9 @@ public class PagingStoreImpl implements PagingStore {
@Override
public Page usePage(final long pageId, final boolean createEntry, final
boolean createFile) {
+ if (fileFactory == null) {
+ return null;
+ }
synchronized (usedPages) {
try {
Page page = usedPages.get(pageId);
@@ -951,12 +1025,56 @@ public class PagingStoreImpl implements PagingStore {
}
}
-
protected SequentialFileFactory getFileFactory() throws Exception {
checkFileFactory();
return fileFactory;
}
+ protected boolean deleteFolder() {
+ SequentialFileFactory sequentialFileFactory = fileFactory;
+ try {
+ if (sequentialFileFactory != null) {
+ List<String> files;
+ try {
+ files = sequentialFileFactory.listFiles(null);
+ } catch (Exception e) {
+ sequentialFileFactory.onIOError(e, e.getMessage());
+ return false;
+ }
+ files.forEach(f -> {
+ SequentialFile file =
sequentialFileFactory.createSequentialFile(f);
+ try {
+ logger.debug("Deleting {}", file);
+ file.delete();
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ sequentialFileFactory.onIOError(e, e.getMessage(),
file.getFileName());
+ }
+ });
+ logger.debug("Deleting directory {}",
sequentialFileFactory.getDirectory());
+ return deleteFolder(sequentialFileFactory);
+ }
+ return true;
+ } finally {
+ this.fileFactory = null;
+ }
+ }
+
+ private boolean deleteFolder(final SequentialFileFactory deletingFolder) {
+ if (!deletingFolder.deleteFolder()) {
+
ActiveMQServerLogger.LOGGER.failedPurgingFolder(deletingFolder.getDirectory().getAbsolutePath());
+ try {
+ List<String> filesStillExisting = deletingFolder.listFiles(null);
+ filesStillExisting.forEach(f -> logger.info("File {} still on
folder {}", f, deletingFolder.getDirectory().getAbsolutePath()));
+ } catch (Exception e) {
+ logger.warn(e.getMessage(), e);
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+
private SequentialFileFactory checkFileFactory() throws Exception {
SequentialFileFactory factory = fileFactory;
if (factory == null) {
@@ -1399,7 +1517,7 @@ public class PagingStoreImpl implements PagingStore {
int bytesToWrite = pagedMessage.getEncodeSize() +
PageReadWriter.SIZE_RECORD;
currentPageSize += bytesToWrite;
- if (currentPageSize > pageSize && currentPage.getNumberOfMessages() > 0)
{
+ if (currentPage == null || currentPageSize > pageSize &&
currentPage.getNumberOfMessages() > 0) {
// Make sure nothing is currently validating or using currentPage
openNewPage();
currentPageSize += bytesToWrite;
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 8037ed12bf..ee8138ca02 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -2034,7 +2034,7 @@ public class PostOfficeImpl implements PostOffice,
NotificationListener, Binding
if (!queue.isInternalQueue() && queue.isAutoDelete() &&
QueueManagerImpl.consumerCountCheck(queue) && (initialCheck ||
QueueManagerImpl.delayCheck(queue, settings)) &&
QueueManagerImpl.messageCountCheck(queue) && (initialCheck ||
queueWasUsed(queue, settings))) {
// we only reap queues on the initialCheck if they are actually
empty
PagingStore queuePagingStore = queue.getPagingStore();
- boolean isPaging = queuePagingStore != null &&
queuePagingStore.isPaging();
+ boolean isPaging = queuePagingStore != null &&
queuePagingStore.isStorePaging();
boolean validInitialCheck = initialCheck &&
queue.getMessageCount() == 0 && !isPaging;
if (validInitialCheck || queue.isSwept()) {
if (logger.isDebugEnabled()) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 1c4bcca247..2b46e3050e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -1504,4 +1504,10 @@ public interface ActiveMQServerLogger {
@LogMessage(id = 224145, value = "Error looking up bindings for address
{}.", level = LogMessage.Level.WARN)
void bridgeBindingsLookupError(SimpleString address, Throwable e);
+ @LogMessage(id = 224146, value = "Purging Page Folder {} on address {}",
level = LogMessage.Level.INFO)
+ void purgingPageFolder(String folder, SimpleString address);
+
+ @LogMessage(id = 224147, value = "Failed purging folder {}", level =
LogMessage.Level.WARN)
+ void failedPurgingFolder(String folder);
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 4d06e8879c..9cc142d204 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -3122,7 +3122,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration)
configuration.getStoreConfiguration();
return new PagingStoreFactoryDatabase(dbConf, storageManager,
configuration.getPageSyncTimeout(), scheduledPool, pageExecutorFactory, false,
ioCriticalErrorListener);
} else {
- return new PagingStoreFactoryNIO(storageManager,
configuration.getPagingLocation(), configuration.getPageSyncTimeout(),
scheduledPool, pageExecutorFactory,
configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener);
+ return new PagingStoreFactoryNIO(storageManager,
configuration.getPagingLocation(), configuration.getPageSyncTimeout(),
scheduledPool, pageExecutorFactory,
configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener,
configuration::isPurgePageFolders);
}
}
@@ -4625,6 +4625,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
{
configuration.setConnectorConfigurations(config.getConnectorConfigurations());
configuration.setAcceptorConfigurations(config.getAcceptorConfigurations());
configuration.setAMQPConnectionConfigurations(config.getAMQPConnection());
+ configuration.setPurgePageFolders(config.isPurgePageFolders());
configurationReloadDeployed.set(false);
if (isActive()) {
configuration.parseProperties(propertiesFileUrl);
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 77369479d1..983728d378 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -999,7 +999,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (deliveriesInTransit.getCount() == 0 &&
getExecutor().isFlushed() &&
intermediateMessageReferences.isEmpty() &&
messageReferences.isEmpty() &&
pageIterator != null && !pageIterator.hasNext() &&
- pageSubscription != null && !pageSubscription.isPaging()) {
+ pageSubscription != null &&
!pageSubscription.isStorePaging()) {
// We must block on the executor to ensure any async
deliveries have completed or we might get out of order
// deliveries
// Go into direct delivery mode
@@ -1061,7 +1061,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
@Override
public void forceDelivery() {
- if (pageSubscription != null && pageSubscription.isPaging()) {
+ if (pageSubscription != null && pageSubscription.isStorePaging()) {
logger.trace("Force delivery scheduling depage");
scheduleDepage(false);
}
@@ -1097,7 +1097,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
@Override
public ArtemisExecutor getExecutor() {
- if (pageSubscription != null && pageSubscription.isPaging()) {
+ if (pageSubscription != null && pageSubscription.isStorePaging()) {
// When in page mode, we don't want to have concurrent IO on the same
PageStore
return pageSubscription.getPagingStore().getExecutor();
} else {
@@ -3086,7 +3086,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
if (queueDestroyed) {
return;
}
- if (pageIterator != null && pageSubscription.isPaging()) {
+ if (pageIterator != null && pageSubscription.isStorePaging()) {
if (logger.isDebugEnabled()) {
logger.debug("CheckDepage on queue name {}, id={}",
queueConfiguration.getName(), queueConfiguration.getId());
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index be4b57e59a..c92486a4f2 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -565,6 +565,15 @@
<xsd:element ref="grouping-handler" maxOccurs="1" minOccurs="0"/>
+ <xsd:element name="purge-page-folders" type="xsd:boolean"
default="false" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ Specifies whether paging folders should be automatically
removed when the address
+ exits the paging state.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="paging-directory" type="xsd:string"
default="data/paging" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index d7132f592e..0def41b650 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -130,6 +130,7 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
@Test
public void testDefaults() {
+ assertEquals(ActiveMQDefaultConfiguration.getPurgePageFolders(),
conf.isPurgePageFolders());
assertEquals(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(),
conf.getScheduledThreadPoolMaxSize());
assertEquals(ActiveMQDefaultConfiguration.getDefaultSecurityInvalidationInterval(),
conf.getSecurityInvalidationInterval());
assertEquals(ActiveMQDefaultConfiguration.isDefaultSecurityEnabled(),
conf.isSecurityEnabled());
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index b2b48928d0..5461097752 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -58,6 +58,19 @@ import org.xml.sax.SAXParseException;
public class FileConfigurationParserTest extends ServerTestBase {
+ private static final String PURGE_FOLDER_FALSE = """
+ <configuration>
+ <core>
+ <purge-page-folders>false</purge-page-folders>
+ </core></configuration>""";
+
+ private static final String PURGE_FOLDER_TRUE = """
+ <configuration>
+ <core>
+ <purge-page-folders>true</purge-page-folders>
+ </core></configuration>""";
+
+
private static final String FIRST_PART = """
<core xmlns="urn:activemq:core">
<name>ActiveMQ.main.config</name>
@@ -204,6 +217,24 @@ public class FileConfigurationParserTest extends
ServerTestBase {
assertEquals(333,
config.getClusterConfigurations().get(0).getRetryInterval());
}
+ @Test
+ public void testParsePurgePageFolder() throws Exception {
+
+ FileConfigurationParser parser = new FileConfigurationParser();
+ {
+ ByteArrayInputStream input = new
ByteArrayInputStream(PURGE_FOLDER_TRUE.getBytes(StandardCharsets.UTF_8));
+ Configuration config = parser.parseMainConfig(input);
+ assertTrue(config.isPurgePageFolders());
+ }
+
+ {
+ ByteArrayInputStream input = new
ByteArrayInputStream(PURGE_FOLDER_FALSE.getBytes(StandardCharsets.UTF_8));
+ Configuration config = parser.parseMainConfig(input);
+ assertFalse(config.isPurgePageFolders());
+ }
+
+ }
+
@Test
public void testParsingZeroIDCacheSize() throws Exception {
FileConfigurationParser parser = new FileConfigurationParser();
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImplTest.java
index a0b5ece37f..198a6e3ddf 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImplTest.java
@@ -68,6 +68,7 @@ public class ManagementServiceImplTest {
Mockito.when(messagingServer.getPostOffice()).thenReturn(postOffice);
Mockito.when(pagingManager.getPageStore(Mockito.any(SimpleString.class))).thenReturn(pageStore);
Mockito.when(pageStore.isPaging()).thenReturn(true);
+ Mockito.when(pageStore.isStorePaging()).thenReturn(true);
managementService.registerServer(null, securityStore, null,
configuration, null, null, null, null, messagingServer, null, null,
pagingManager, false);
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index f8744d4154..f6ac7e8340 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -563,6 +563,7 @@
<paging-directory>pagingdir</paging-directory>
<bindings-directory>somedir</bindings-directory>
<create-bindings-dir>false</create-bindings-dir>
+ <purge-page-folders>true</purge-page-folders>
<page-max-concurrent-io>17</page-max-concurrent-io>
<read-whole-page>true</read-whole-page>
<journal-directory>somedir2</journal-directory>
diff --git
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/TestParameters.java
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/TestParameters.java
index 2e20f59786..8b03efe0a5 100644
---
a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/TestParameters.java
+++
b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/utils/TestParameters.java
@@ -29,9 +29,10 @@ import org.slf4j.LoggerFactory;
*/
public class TestParameters {
-
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ public static final String NFS_FOLDER = getVariable("ARTEMIS_NFS", null);
+
private static String propertyName(String testName, String property) {
if (testName == null) {
return "TEST_" + property;
@@ -62,6 +63,15 @@ public class TestParameters {
property = propertyName(testName, property);
+ String value = getVariable(property, defaultValue);
+
+ logger.info("{}={}", property, value);
+
+ return value;
+
+ }
+
+ public static String getVariable(String property, String defaultValue) {
String value = System.getenv(property);
if (value == null) {
value = System.getProperty(property);
@@ -72,8 +82,6 @@ public class TestParameters {
value = defaultValue;
}
- logger.info("{}={}", property, value);
-
return value;
}
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/Database.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/Database.java
index e3b0ba1698..d73517d06b 100644
---
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/Database.java
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/common/Database.java
@@ -155,7 +155,7 @@ public enum Database {
return dbList;
}
- public static Database random() {
+ public static Database randomDB() {
List<Database> selectedDatabases = selectedList();
if (selectedDatabases.isEmpty()) {
return null;
@@ -167,7 +167,7 @@ public enum Database {
public static List<Database> randomList() {
List<Database> selectedDatabases = selectedList();
List<Database> list = new ArrayList<>();
- Database randomDB = random();
+ Database randomDB = randomDB();
if (randomDB != null) {
list.add(randomDB);
}
diff --git
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
index 633086ece1..1889cc7c3c 100644
---
a/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
+++
b/tests/db-tests/src/test/java/org/apache/activemq/artemis/tests/db/paging/PagingTest.java
@@ -118,6 +118,7 @@ import
org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import
org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.artemis.tests.db.common.Database;
import org.apache.activemq.artemis.tests.db.common.ParameterDBTestBase;
+import org.apache.activemq.artemis.tests.extensions.parameterized.Parameter;
import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
import org.apache.activemq.artemis.tests.extensions.parameterized.Parameters;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -149,17 +150,22 @@ public class PagingTest extends ParameterDBTestBase {
protected ClientSessionFactory sf;
private AssertionLoggerHandler loggerHandler;
- @Parameters(name = "db={0}")
- public static Collection<Object[]> parameters() {
- List<Database> databases = new ArrayList<>();
- databases.add(Database.JOURNAL);
+ @Parameter(index = 1)
+ public boolean purgeFolders;
+
- // PagingTest is quite expensive. And it's not really needed to run
every single database every time
- // we will just pick one!
- databases.addAll(Database.randomList());
+ @Override
+ protected Configuration createDefaultInVMConfig() throws Exception {
+ return super.createDefaultConfig(0,
false).setPurgePageFolders(purgeFolders);
+ }
- // we will run PagingTest in one database and the journal to validate
the codebase in both implementations
- return convertParameters(databases);
+ @Parameters(name = "db={0}, purgeFolders={1}")
+ public static Collection<Object[]> parameters() {
+ List<Object[]> databases = new ArrayList<>();
+ databases.add(new Object[] {Database.JOURNAL, true});
+ databases.add(new Object[] {Database.JOURNAL, false});
+ databases.add(new Object[] {Database.randomDB(), false});
+ return databases;
}
@Override
@@ -756,8 +762,6 @@ public class PagingTest extends ParameterDBTestBase {
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
- ClientMessage message = null;
-
byte[] body = new byte[10];
ByteBuffer bb = ByteBuffer.wrap(body);
@@ -770,20 +774,14 @@ public class PagingTest extends ParameterDBTestBase {
for (int repeat = 0; repeat < 2; repeat++) {
queue.getPagingStore().startPaging();
- int page = 1;
for (int i = 0; i < numberOfMessages; i++) {
- if (i % 10 == 0 && i > 0) {
- queue.getPagingStore().forceAnotherPage(true);
- page++;
- }
- message = session.createMessage(true);
+ ClientMessage message = session.createMessage(true);
ActiveMQBuffer bodyLocal = message.getBodyBuffer();
bodyLocal.writeBytes(body);
message.putIntProperty("i", i);
- message.putIntProperty("page", page);
producer.send(message);
}
@@ -3712,7 +3710,7 @@ public class PagingTest extends ParameterDBTestBase {
server = new ActiveMQServerImpl(config,
ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
@Override
protected PagingStoreFactoryNIO getPagingStoreFactory() {
- return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null, () ->
purgeFolders) {
@Override
public PageCursorProvider newCursorProvider(PagingStore store,
StorageManager
storageManager,
@@ -3864,7 +3862,7 @@ public class PagingTest extends ParameterDBTestBase {
server = new ActiveMQServerImpl(config,
ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
@Override
protected PagingStoreFactoryNIO getPagingStoreFactory() {
- return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null, () ->
purgeFolders) {
@Override
public PageCursorProvider newCursorProvider(PagingStore store,
StorageManager
storageManager,
@@ -6086,7 +6084,7 @@ public class PagingTest extends ParameterDBTestBase {
AddressSettings addressSettings,
ArtemisExecutor executor,
boolean syncNonTransactional) {
- super(address, scheduledExecutor, syncTimeout, pagingManager,
storageManager, fileFactory, storeFactory, storeName, addressSettings,
executor, syncNonTransactional);
+ super(address, scheduledExecutor, syncTimeout, pagingManager,
storageManager, fileFactory, storeFactory, storeName, addressSettings,
executor, syncNonTransactional, () -> purgeFolders);
}
/**
@@ -6114,7 +6112,7 @@ public class PagingTest extends ParameterDBTestBase {
server = new ActiveMQServerImpl(config,
ManagementFactory.getPlatformMBeanServer(), new ActiveMQSecurityManagerImpl()) {
@Override
protected PagingStoreFactoryNIO getPagingStoreFactory() {
- return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null, () ->
purgeFolders) {
@Override
public synchronized PagingStore newStore(SimpleString
address, AddressSettings settings) {
return new NonStoppablePagingStoreImpl(address,
this.getScheduledExecutor(), config.getJournalBufferTimeout_NIO(),
getPagingManager(), getStorageManager(), null, this, address, settings,
getExecutorFactory().getExecutor(), this.isSyncNonTransactional());
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PrimaryCrashOnBackupSyncTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PrimaryCrashOnBackupSyncTest.java
index eae40f7c87..c45d4352cf 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PrimaryCrashOnBackupSyncTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/PrimaryCrashOnBackupSyncTest.java
@@ -215,7 +215,7 @@ public class PrimaryCrashOnBackupSyncTest extends
ActiveMQTestBase {
ActiveMQServer primaryServer = new
ActiveMQServerImpl(primaryConfiguration,
ManagementFactory.getPlatformMBeanServer(), new
ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new
SecurityConfiguration())) {
@Override
protected PagingStoreFactoryNIO getPagingStoreFactory() {
- return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null) {
+ return new PagingStoreFactoryNIO(this.getStorageManager(),
this.getConfiguration().getPagingLocation(),
this.getConfiguration().getJournalBufferTimeout_NIO(), this.getScheduledPool(),
this.getExecutorFactory(),
this.getConfiguration().isJournalSyncNonTransactional(), null, () -> true) {
@Override
public synchronized PagingStore newStore(SimpleString
address, AddressSettings settings) {
return new DelayPagingStoreImpl(address,
this.getScheduledExecutor(),
primaryConfiguration.getJournalBufferTimeout_NIO(), getPagingManager(),
getStorageManager(), null, this, address, settings,
getExecutorFactory().getExecutor(), getExecutorFactory().getExecutor(),
this.isSyncNonTransactional());
@@ -268,7 +268,7 @@ class DelayPagingStoreImpl extends PagingStoreImpl {
AddressSettings addressSettings,
ArtemisExecutor executor, ArtemisExecutor
ioExecutor,
boolean syncNonTransactional) {
- super(address, scheduledExecutor, syncTimeout, pagingManager,
storageManager, fileFactory, storeFactory, storeName, addressSettings,
executor, syncNonTransactional);
+ super(address, scheduledExecutor, syncTimeout, pagingManager,
storageManager, fileFactory, storeFactory, storeName, addressSettings,
executor, syncNonTransactional, () -> true);
}
@Override
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
index 9e4541ca7b..345627697b 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/ReplicatedPagedFailoverTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.failover;
+import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@@ -32,11 +33,17 @@ import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class ReplicatedPagedFailoverTest extends ReplicatedFailoverTest {
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
@Override
protected ActiveMQServer createInVMFailoverServer(final boolean realFiles,
final Configuration
configuration,
@@ -125,4 +132,49 @@ public class ReplicatedPagedFailoverTest extends
ReplicatedFailoverTest {
Wait.assertFalse(queue.getPageSubscription().getPagingStore()::isPaging);
}
+
+ @Test
+ public void testMultipleCleanup() throws Exception {
+ int numMessages = 50;
+ int repeats = 3;
+ createSessionFactory();
+ try (ClientSession session = createSession(sf, false, false)) {
+
+
session.createQueue(QueueConfiguration.of(FailoverTestBase.ADDRESS).setDurable(true));
+
+ ClientProducer producer =
session.createProducer(FailoverTestBase.ADDRESS);
+
+ Queue queue =
primaryServer.getServer().locateQueue(FailoverTest.ADDRESS);
+
+ for (int runNumber = 0; runNumber < repeats; runNumber++) {
+
+ logger.info("Repeat #{}", runNumber);
+
+
assertTrue(queue.getPageSubscription().getPagingStore().startPaging());
+ assertNotNull(queue);
+
+ for (int i = 0; i < numMessages; i++) {
+ logger.debug("Send {}", i);
+ producer.send(createMessage(session, i, true));
+ }
+ session.commit();
+
+ try (ClientConsumer consumer =
session.createConsumer(FailoverTestBase.ADDRESS, false)) {
+ session.start();
+
+ for (int i = 0; i < numMessages; i++) {
+ ClientMessage msg = consumer.receive(500);
+ Assertions.assertNotNull(msg);
+ msg.acknowledge();
+ logger.debug("consumed {}", i);
+ }
+ assertNull(consumer.receiveImmediate());
+ }
+ session.commit();
+
+
Wait.assertFalse(queue.getPageSubscription().getPagingStore()::isPaging, 5000,
100);
+ }
+ }
+ }
+
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCleanupFolderTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCleanupFolderTest.java
new file mode 100644
index 0000000000..fc3b4532a6
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/PageCleanupFolderTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.ReusableLatch;
+import org.apache.activemq.artemis.utils.TestParameters;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** It is recommended to setup the variable ARTEMIS_NFS to use a NFS mount on
this test. */
+public class PageCleanupFolderTest extends ActiveMQTestBase {
+
+ protected static final int PAGE_MAX = 100 * 1024;
+ protected static final int PAGE_SIZE = 10 * 1024;
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ protected ServerLocator locator;
+ protected ActiveMQServer server;
+ protected ClientSessionFactory sf;
+ private AssertionLoggerHandler loggerHandler;
+
+ @Test
+ public void testCleanupFolders() throws Exception {
+
+ Configuration config =
createDefaultConfig(true).setPurgePageFolders(true).setJournalType(JournalType.NIO);
+
+ if (TestParameters.NFS_FOLDER != null) {
+ File dataFolder = new File(TestParameters.NFS_FOLDER +
"/PageCleanupFolderTest");
+ deleteDirectory(dataFolder);
+ config.setJournalDirectory(dataFolder.getAbsolutePath() + "/journal").
+ setPagingDirectory(dataFolder.getAbsolutePath() + "/paging").
+ setBindingsDirectory(dataFolder.getAbsolutePath() + "/binding").
+ setLargeMessagesDirectory(dataFolder.getAbsolutePath() +
"/largeMessages");
+ }
+
+ final int PAGE_MAX = 0;
+
+ final int PAGE_SIZE = 1024 * 1024;
+
+ server = createServer(true, config, PAGE_SIZE, PAGE_MAX, -1, -1);
+ server.start();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(20);
+ runAfter(executorService::shutdownNow);
+
+ int destinations = 20;
+
+ for (int serverStart = 0; serverStart < 3; serverStart++) {
+ try (AssertionLoggerHandler assertionLoggerHandler = new
AssertionLoggerHandler()) {
+ AtomicInteger errors = new AtomicInteger(0);
+ ReusableLatch done = new ReusableLatch(destinations);
+ for (int i = 0; i < destinations; i++) {
+ final int destinationID = i;
+ executorService.execute(() -> {
+ ConnectionFactory factory =
CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue("queue" + destinationID));
+ for (int s = 0; s < 100; s++) {
+ producer.send(session.createTextMessage("hello " + s));
+ }
+ session.commit();
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+ assertTrue(done.await(50, TimeUnit.SECONDS));
+
+ done.setCount(destinations);
+ for (int i = 0; i < destinations; i++) {
+ final int destinationID = i;
+ executorService.execute(() -> {
+ ConnectionFactory factory =
CFUtil.createConnectionFactory("CORE", "tcp://localhost:61616");
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue("queue" + destinationID));
+ connection.start();
+ for (int s = 0; s < 100; s++) {
+ assertNotNull(consumer.receive(5000));
+ }
+ session.commit();
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ logger.warn(e.getMessage(), e);
+ } finally {
+ done.countDown();
+ }
+ });
+ }
+ assertTrue(done.await(5, TimeUnit.SECONDS));
+ assertEquals(0, errors.get());
+
+ for (int i = 0; i < destinations; i++) {
+ Queue queue = server.locateQueue("queue" + i);
+ assertNotNull(queue);
+ Wait.assertFalse(queue.getPagingStore()::isPaging, 5000, 100);
+ }
+
+ // this is the code for purging a page folder
+ Wait.assertTrue(() ->
assertionLoggerHandler.countText("AMQ224146") >= destinations, 5000, 100);
+ // this is the code to "fail to purge the page folder"
+ assertEquals(0, assertionLoggerHandler.countText("AMQ224147"));
+
+ File pageFolderLocation =
server.getConfiguration().getPagingLocation();
+ Wait.assertEquals(0, () -> pageFolderLocation.listFiles().length,
5000, 100);
+
+ server.stop();
+ server.start();
+
+ Wait.assertEquals(0, () -> pageFolderLocation.listFiles().length,
5000, 100);
+ }
+
+ }
+ }
+}
diff --git
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
index 0a784f21a4..c3777bb8b0 100644
---
a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
+++
b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java
@@ -415,6 +415,11 @@ public class PersistMultiThreadTest extends
ActiveMQTestBase {
return false;
}
+ @Override
+ public boolean isStorePaging() {
+ return false;
+ }
+
@Override
public Page usePage(long page, boolean create) {
return null;
diff --git
a/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml
b/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml
deleted file mode 100644
index 44dabb080c..0000000000
--- a/tests/soak-tests/src/main/resources/servers/horizontalPaging/broker.xml
+++ /dev/null
@@ -1,264 +0,0 @@
-<?xml version='1.0'?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<configuration xmlns="urn:activemq"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:xi="http://www.w3.org/2001/XInclude"
- xsi:schemaLocation="urn:activemq
/schema/artemis-configuration.xsd">
-
- <core xmlns="urn:activemq:core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:activemq:core ">
-
- <name>0.0.0.0</name>
-
-
- <persistence-enabled>true</persistence-enabled>
-
- <!-- this could be ASYNCIO, MAPPED, NIO
- ASYNCIO: Linux Libaio
- MAPPED: mmap files
- NIO: Plain Java Files
- -->
- <journal-type>ASYNCIO</journal-type>
-
- <paging-directory>data/paging</paging-directory>
-
- <bindings-directory>data/bindings</bindings-directory>
-
- <journal-directory>data/journal</journal-directory>
-
- <large-messages-directory>data/large-messages</large-messages-directory>
-
-
- <!-- if you want to retain your journal uncomment this following
configuration.
-
- This will allow your system to keep 7 days of your data, up to 10G.
Tweak it accordingly to your use case and capacity.
-
- it is recommended to use a separate storage unit from the journal for
performance considerations.
-
- <journal-retention-directory period="7" unit="DAYS"
storage-limit="10G">data/retention</journal-retention-directory>
-
- You can also enable retention by using the argument journal-retention on
the `artemis create` command -->
-
-
-
- <journal-datasync>true</journal-datasync>
-
- <journal-min-files>2</journal-min-files>
-
- <journal-pool-files>10</journal-pool-files>
-
- <journal-device-block-size>4096</journal-device-block-size>
-
- <journal-file-size>10M</journal-file-size>
-
- <message-expiry-scan-period>-1</message-expiry-scan-period>
-
- <page-max-concurrent-io>5</page-max-concurrent-io>
-
- <!--
- This value was determined through a calculation.
- Your system could perform 10.42 writes per millisecond
- on the current journal configuration.
- That translates as a sync write every 96000 nanoseconds.
-
- Note: If you specify 0 the system will perform writes directly to the
disk.
- We recommend this to be 0 if you are using journalType=MAPPED and
journal-datasync=false.
- -->
- <journal-buffer-timeout>96000</journal-buffer-timeout>
-
-
- <!--
- When using ASYNCIO, this will determine the writing queue depth for
libaio.
- -->
- <journal-max-io>4096</journal-max-io>
- <!--
- You can verify the network health of a particular NIC by specifying
the <network-check-NIC> element.
- <network-check-NIC>theNicName</network-check-NIC>
- -->
-
- <!--
- Use this to use an HTTP server to validate the network
-
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
-
- <!-- <network-check-period>10000</network-check-period> -->
- <!-- <network-check-timeout>1000</network-check-timeout> -->
-
- <!-- this is a comma separated list, no spaces, just DNS or IPs
- it should accept IPV6
-
- Warning: Make sure you understand your network topology as this is
meant to validate if your network is valid.
- Using IPs that could eventually disappear or be partially
visible may defeat the purpose.
- You can use a list of multiple IPs, and if any successful
ping will make the server OK to continue running -->
- <!-- <network-check-list>10.0.0.1</network-check-list> -->
-
- <!-- use this to customize the ping used for ipv4 addresses -->
- <!-- <network-check-ping-command>ping -c 1 -t %d
%s</network-check-ping-command> -->
-
- <!-- use this to customize the ping used for ipv6 addresses -->
- <!-- <network-check-ping6-command>ping6 -c 1
%2$s</network-check-ping6-command> -->
-
-
-
-
- <!-- how often we are looking for how many bytes are being used on the
disk in ms -->
- <disk-scan-period>5000</disk-scan-period>
-
- <!-- once the disk hits this limit the system will block, or close the
connection in certain protocols
- that won't support flow control. -->
- <max-disk-usage>90</max-disk-usage>
-
- <!-- should the broker detect dead locks and other issues -->
- <critical-analyzer>true</critical-analyzer>
-
- <critical-analyzer-timeout>120000</critical-analyzer-timeout>
-
- <critical-analyzer-check-period>60000</critical-analyzer-check-period>
-
- <critical-analyzer-policy>HALT</critical-analyzer-policy>
-
-
- <page-sync-timeout>8332000</page-sync-timeout>
-
-
- <!-- the system will enter into page mode once you hit this limit. This
is an estimate in bytes of how much the messages are using in memory
-
- The system will use half of the available memory (-Xmx) by default for
the global-max-size.
- You may specify a different value here if you need to customize it to
your needs.
-
- <global-max-size>100Mb</global-max-size> -->
-
- <!-- the maximum number of messages accepted before entering full
address mode.
- if global-max-size is specified the full address mode will be
specified by whatever hits it first. -->
- <global-max-messages>-1</global-max-messages>
-
- <acceptors>
-
- <!-- useEpoll means: it will use Netty epoll if you are on a system
(Linux) that supports it -->
- <!-- amqpCredits: The number of credits sent to AMQP producers -->
- <!-- amqpLowCredits: The server will send the # credits specified at
amqpCredits at this low mark -->
- <!-- amqpDuplicateDetection: If you are not using duplicate
detection, set this to false
- as duplicate detection requires
applicationProperties to be parsed on the server. -->
- <!-- amqpMinLargeMessageSize: Determines how many bytes are
considered large, so we start using files to hold their data.
- default: 102400, -1 would mean to
disable large message control -->
-
- <!-- Note: If an acceptor needs to be compatible with HornetQ and/or
Artemis 1.x clients add
- "anycastPrefix=jms.queue.;multicastPrefix=jms.topic." to
the acceptor url.
- See https://issues.apache.org/jira/browse/ARTEMIS-1644 for
more information. -->
-
-
- <!-- Acceptor for every supported protocol -->
- <acceptor
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true;supportAdvisory=false;suppressInternalManagementObjects=false</acceptor>
-
- <!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
- <acceptor
name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpMinLargeMessageSize=102400;amqpDuplicateDetection=true</acceptor>
-
- <!-- STOMP Acceptor. -->
- <acceptor
name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
-
- <!-- HornetQ Compatibility Acceptor. Enables HornetQ Core and STOMP
for legacy HornetQ clients. -->
- <acceptor
name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>
-
- <!-- MQTT Acceptor -->
- <acceptor
name="mqtt">tcp://0.0.0.0:1883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>
-
- </acceptors>
-
-
- <security-settings>
- <security-setting match="#">
- <permission type="createNonDurableQueue" roles="amq"/>
- <permission type="deleteNonDurableQueue" roles="amq"/>
- <permission type="createDurableQueue" roles="amq"/>
- <permission type="deleteDurableQueue" roles="amq"/>
- <permission type="createAddress" roles="amq"/>
- <permission type="deleteAddress" roles="amq"/>
- <permission type="consume" roles="amq"/>
- <permission type="browse" roles="amq"/>
- <permission type="send" roles="amq"/>
- <!-- we need this otherwise ./artemis data imp wouldn't work -->
- <permission type="manage" roles="amq"/>
- </security-setting>
- </security-settings>
-
- <address-settings>
- <!-- if you define auto-create on certain queues, management has to
be auto-create -->
- <address-setting match="activemq.management.#">
- <dead-letter-address>DLQ</dead-letter-address>
- <expiry-address>ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
- <!-- with -1 only the global-max-size is in use for limiting -->
- <max-size-bytes>-1</max-size-bytes>
-
<message-counter-history-day-limit>10</message-counter-history-day-limit>
- <address-full-policy>PAGE</address-full-policy>
- <auto-create-queues>true</auto-create-queues>
- <auto-create-addresses>true</auto-create-addresses>
- </address-setting>
- <!--default for catch all-->
- <address-setting match="#">
- <dead-letter-address>DLQ</dead-letter-address>
- <expiry-address>ExpiryQueue</expiry-address>
- <redelivery-delay>0</redelivery-delay>
- <max-size-bytes>0</max-size-bytes>
- <max-size-messages>0</max-size-messages>
- <page-size-bytes>5M</page-size-bytes>
- <max-read-page-bytes>100M</max-read-page-bytes>
- <max-read-page-messages>-1</max-read-page-messages>
- <prefetch-page-bytes>1M</prefetch-page-bytes>
-
<message-counter-history-day-limit>10</message-counter-history-day-limit>
- <address-full-policy>PAGE</address-full-policy>
- <auto-create-queues>true</auto-create-queues>
- <auto-create-addresses>true</auto-create-addresses>
- <auto-delete-queues>false</auto-delete-queues>
- <auto-delete-addresses>false</auto-delete-addresses>
- </address-setting>
- </address-settings>
-
- <addresses>
- <address name="DLQ">
- <anycast>
- <queue name="DLQ" />
- </anycast>
- </address>
- <address name="ExpiryQueue">
- <anycast>
- <queue name="ExpiryQueue" />
- </anycast>
- </address>
-
- </addresses>
-
-
- <!-- Uncomment the following if you want to use the Standard
LoggingActiveMQServerPlugin pluging to log in events
- <broker-plugins>
- <broker-plugin
class-name="org.apache.activemq.artemis.core.server.plugin.impl.LoggingActiveMQServerPlugin">
- <property key="LOG_ALL_EVENTS" value="true"/>
- <property key="LOG_CONNECTION_EVENTS" value="true"/>
- <property key="LOG_SESSION_EVENTS" value="true"/>
- <property key="LOG_CONSUMER_EVENTS" value="true"/>
- <property key="LOG_DELIVERING_EVENTS" value="true"/>
- <property key="LOG_SENDING_EVENTS" value="true"/>
- <property key="LOG_INTERNAL_EVENTS" value="true"/>
- </broker-plugin>
- </broker-plugins>
- -->
-
- </core>
-</configuration>
diff --git
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java
index bf071c5f69..0d3f2d9871 100644
---
a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java
+++
b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/paging/HorizontalPagingTest.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.soak.paging;
import static org.apache.activemq.artemis.utils.TestParameters.testProperty;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@ -30,6 +31,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
+import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -43,7 +45,7 @@ import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TestParameters;
import org.apache.activemq.artemis.cli.commands.helper.HelperCreate;
-import org.junit.jupiter.api.BeforeAll;
+import org.apache.activemq.artemis.utils.Wait;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -52,8 +54,9 @@ import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
/**
- * Refer to ./scripts/parameters.sh for suggested parameters #You may choose
to use zip files to save some time on
- * producing if you want to run this test over and over when debugging export
TEST_HORIZONTAL_ZIP_LOCATION=a folder
+ * Refer to ./scripts/parameters.sh for suggested parameters.
+ *
+ * It is recommended to set ARTEMIS_NFS to a mounted NFS folder in order to
run this test.
*/
public class HorizontalPagingTest extends SoakTestBase {
@@ -78,28 +81,35 @@ public class HorizontalPagingTest extends SoakTestBase {
public static final String SERVER_NAME_0 = "horizontalPaging";
- @BeforeAll
- public static void createServers() throws Exception {
- {
- File serverLocation = getFileServerLocation(SERVER_NAME_0);
- deleteDirectory(serverLocation);
-
- File dataFolder = null;
- if (DATA_FOLDER != null) {
- dataFolder = new File(DATA_FOLDER);
- if (dataFolder.exists()) {
- deleteDirectory(dataFolder);
- }
- }
+ public static void createLocalServer(boolean purgePage) throws Exception {
+ File serverLocation = getFileServerLocation(SERVER_NAME_0);
+ deleteDirectory(serverLocation);
+
+ File dataFolder = selectedDataFolder();
+ deleteDirectory(dataFolder);
+
+ HelperCreate cliCreateServer = helperCreate();
+
cliCreateServer.setRole("amq").setUser("admin").setPassword("admin").setAllowAnonymous(true).setNoWeb(false).setArtemisInstance(serverLocation);
+ cliCreateServer.setArgs("--java-memory",
"2g").setPurgePageFolders(purgePage);
+ cliCreateServer.createServer();
+
+ File brokerXml = new File(getServerLocation(SERVER_NAME_0),
"/etc/broker.xml");
- HelperCreate cliCreateServer = helperCreate();
-
cliCreateServer.setRole("amq").setUser("admin").setPassword("admin").setAllowAnonymous(true).setNoWeb(false).setArtemisInstance(serverLocation);
- cliCreateServer.setArgs("--java-memory", "2g");
-
cliCreateServer.setConfiguration("./src/main/resources/servers/horizontalPaging");
- cliCreateServer.createServer();
+ assertNotNull(dataFolder);
- if (dataFolder != null) {
- assertTrue(FileUtil.findReplace(new
File(getFileServerLocation(SERVER_NAME_0), "/etc/broker.xml"), "data/",
dataFolder.getAbsolutePath() + "/"));
+ assertTrue(FileUtil.findReplace(brokerXml,
"<max-size-messages>-1</max-size-messages>",
"<max-size-messages>0</max-size-messages>"));
+ assertTrue(FileUtil.findReplace(new
File(getFileServerLocation(SERVER_NAME_0), "/etc/broker.xml"), "./data/",
dataFolder.getAbsolutePath() + "/"));
+
+ }
+
+ private static File selectedDataFolder() {
+ if (DATA_FOLDER != null) {
+ return new File(DATA_FOLDER);
+ } else {
+ if (TestParameters.NFS_FOLDER != null) {
+ return new File(TestParameters.NFS_FOLDER +
"/horizontalPagingTest");
+ } else {
+ return new File(getServerLocation(SERVER_NAME_0), "data");
}
}
}
@@ -120,8 +130,6 @@ public class HorizontalPagingTest extends SoakTestBase {
public void before() throws Exception {
assumeTrue(TEST_ENABLED);
cleanupData(SERVER_NAME_0);
-
- serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
}
/// ///////////////////////////////////////////////////
@@ -129,21 +137,25 @@ public class HorizontalPagingTest extends SoakTestBase {
/// as the server has to be killed within the timeframe of protocol being
executed
/// to validate proper callbacks are in place
@Test
- public void testHorizontalAMQP() throws Exception {
- testHorizontal("AMQP");
+ public void testHorizontalAMQPAndPurge() throws Exception {
+
+ // we will use purge in just one of the options.. no need to mix the
protocols, one is enough
+ testHorizontal("AMQP", true);
}
@Test
public void testHorizontalCORE() throws Exception {
- testHorizontal("CORE");
+ testHorizontal("CORE", false);
}
@Test
public void testHorizontalOPENWIRE() throws Exception {
- testHorizontal("OPENWIRE");
+ testHorizontal("OPENWIRE", false);
}
- private void testHorizontal(String protocol) throws Exception {
+ private void testHorizontal(String protocol, boolean purgePage) throws
Exception {
+ createLocalServer(purgePage);
+ serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
AtomicInteger errors = new AtomicInteger(0);
ExecutorService service = Executors.newFixedThreadPool(EXECUTOR_SIZE);
@@ -189,7 +201,8 @@ public class HorizontalPagingTest extends SoakTestBase {
}
killServer(serverProcess, true);
- serverProcess = startServer(SERVER_NAME_0, 0, -1);
+
+ serverProcess = startServer(SERVER_NAME_0, 0, SERVER_START_TIMEOUT);
assertTrue(ServerUtil.waitForServerToStart(0, SERVER_START_TIMEOUT));
assertEquals(0, errors.get());
@@ -262,9 +275,23 @@ public class HorizontalPagingTest extends SoakTestBase {
assertEquals(0, errors.get());
assertEquals(DESTINATIONS, completedFine.get());
- killServer(serverProcess, true);
+ validatePageFolders(purgePage);
+
+ killServer(serverProcess, false);
+
serverProcess = startServer(SERVER_NAME_0, 0, -1);
assertTrue(ServerUtil.waitForServerToStart(0, SERVER_START_TIMEOUT));
+
+ validatePageFolders(purgePage);
+ }
+
+ private static void validatePageFolders(boolean purgePage) throws Exception
{
+ File dataFolder = new File(selectedDataFolder(), "paging");
+ if (purgePage) {
+ Wait.assertEquals(0, () ->
(Objects.requireNonNull(dataFolder.listFiles())).length, 5000, 100);
+ } else {
+ Wait.assertTrue(() ->
Objects.requireNonNull(dataFolder.listFiles()).length > 0, 5000, 100);
+ }
}
}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
index 53a9629046..cc8e11d66d 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PageTimedWriterUnitTest.java
@@ -248,7 +248,7 @@ public class PageTimedWriterUnitTest extends
ArtemisTestCase {
AddressSettings settings = new
AddressSettings().setPageSizeBytes(MAX_SIZE).setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
- pageStore = new PagingStoreImpl(ADDRESS, scheduledExecutorService, 100,
mockPagingManager, realJournalStorageManager, inMemoryFileFactory,
mockPageStoreFactory, ADDRESS, settings, executorFactory.getExecutor(), false) {
+ pageStore = new PagingStoreImpl(ADDRESS, scheduledExecutorService, 100,
mockPagingManager, realJournalStorageManager, inMemoryFileFactory,
mockPageStoreFactory, ADDRESS, settings, executorFactory.getExecutor(), false,
() -> true) {
@Override
protected PageTimedWriter
createPageTimedWriter(ScheduledExecutorService scheduledExecutor, long
syncTimeout) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact