This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 6c0a1f3da0 ARTEMIS-5750 Implement disk-full-policy address setting
6c0a1f3da0 is described below
commit 6c0a1f3da0b2d592424859a763f3dc37b2fc4952
Author: iliya <[email protected]>
AuthorDate: Fri Nov 14 17:13:18 2025 +0300
ARTEMIS-5750 Implement disk-full-policy address setting
---
.../core/settings/impl/DiskFullMessagePolicy.java | 21 +++
.../artemis/core/config/impl/Validators.java | 10 ++
.../deployers/impl/FileConfigurationParser.java | 6 +
.../artemis/core/paging/impl/PagingStoreImpl.java | 98 ++++++++++---
.../artemis/core/server/ActiveMQMessageBundle.java | 3 +
.../core/settings/impl/AddressSettings.java | 25 +++-
.../resources/schema/artemis-configuration.xsd | 15 ++
.../core/config/impl/ConfigurationImplTest.java | 3 +
.../config/impl/FileConfigurationParserTest.java | 20 +++
.../core/config/impl/FileConfigurationTest.java | 3 +
.../resources/ConfigurationTest-full-config.xml | 1 +
...rationTest-xinclude-config-address-settings.xml | 1 +
...est-xinclude-schema-config-address-settings.xml | 1 +
docs/user-manual/address-settings.adoc | 10 ++
.../integration/paging/DiskFullSendPagingTest.java | 151 +++++++++++++++++++++
.../unit/core/paging/impl/PagingStoreImplTest.java | 62 ++++++++-
16 files changed, 402 insertions(+), 28 deletions(-)
diff --git
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/DiskFullMessagePolicy.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/DiskFullMessagePolicy.java
new file mode 100644
index 0000000000..09833505a3
--- /dev/null
+++
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/DiskFullMessagePolicy.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.settings.impl;
+
+public enum DiskFullMessagePolicy {
+ DROP, BLOCK, FAIL
+}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index 338dc9fdbd..1b128cdb11 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
@@ -148,6 +149,15 @@ public final class Validators {
return value;
};
+ public static final Validator<String> DISK_FULL_MESSAGE_POLICY_TYPE =
(name, value) -> {
+ if (value == null ||
!value.equals(DiskFullMessagePolicy.DROP.toString()) &&
+ !value.equals(DiskFullMessagePolicy.BLOCK.toString()) &&
+ !value.equals(DiskFullMessagePolicy.FAIL.toString())) {
+ throw ActiveMQMessageBundle.BUNDLE.invalidDiskFullPolicyType(value);
+ }
+ return value;
+ };
+
public static final Validator<String> PAGE_FULL_MESSAGE_POLICY_TYPE =
(name, value) -> {
if (value == null ||
!value.equals(PageFullMessagePolicy.DROP.toString()) &&
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6503b2931d..ebb479b0c9 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -104,6 +104,7 @@ import
org.apache.activemq.artemis.core.server.routing.policies.PolicyFactoryRes
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@@ -128,6 +129,7 @@ import org.w3c.dom.NodeList;
import static
org.apache.activemq.artemis.core.config.impl.Validators.ADDRESS_FULL_MESSAGE_POLICY_TYPE;
import static
org.apache.activemq.artemis.core.config.impl.Validators.COMPONENT_ROUTING_TYPE;
import static
org.apache.activemq.artemis.core.config.impl.Validators.DELETION_POLICY_TYPE;
+import static
org.apache.activemq.artemis.core.config.impl.Validators.DISK_FULL_MESSAGE_POLICY_TYPE;
import static org.apache.activemq.artemis.core.config.impl.Validators.GE_ZERO;
import static org.apache.activemq.artemis.core.config.impl.Validators.GT_ZERO;
import static
org.apache.activemq.artemis.core.config.impl.Validators.JOURNAL_TYPE;
@@ -259,6 +261,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
private static final String PAGE_FULL_MESSAGE_POLICY_NODE_NAME =
"page-full-policy";
+ private static final String DISK_FULL_MESSAGE_POLICY_NODE_NAME =
"disk-full-policy";
+
private static final String MAX_READ_PAGE_BYTES_NODE_NAME =
"max-read-page-bytes";
private static final String PREFETCH_PAGE_BYTES_NODE_NAME =
"prefetch-page-bytes";
@@ -1368,6 +1372,8 @@ public final class FileConfigurationParser extends
XMLConfigurationUtil {
addressSettings.setMessageCounterHistoryDayLimit(XMLUtil.parseInt(child));
} else if
(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) {
addressSettings.setAddressFullMessagePolicy(Enum.valueOf(AddressFullMessagePolicy.class,
ADDRESS_FULL_MESSAGE_POLICY_TYPE.validate(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME,
getTrimmedTextContent(child))));
+ } else if (DISK_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name))
{
+
addressSettings.setDiskFullMessagePolicy(Enum.valueOf(DiskFullMessagePolicy.class,
DISK_FULL_MESSAGE_POLICY_TYPE.validate(DISK_FULL_MESSAGE_POLICY_NODE_NAME,
getTrimmedTextContent(child))));
} else if (PAGE_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name))
{
addressSettings.setPageFullMessagePolicy(Enum.valueOf(PageFullMessagePolicy.class,
PAGE_FULL_MESSAGE_POLICY_TYPE.validate(PAGE_FULL_MESSAGE_POLICY_NODE_NAME,
getTrimmedTextContent(child))));
} else if (LVQ_NODE_NAME.equalsIgnoreCase(name) ||
DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) {
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 0c1092d142..33b0bfe891 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -56,6 +56,7 @@ import
org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@@ -122,6 +123,8 @@ public class PagingStoreImpl implements PagingStore {
private PageFullMessagePolicy pageFullMessagePolicy;
+ private DiskFullMessagePolicy diskFullMessagePolicy;
+
private int pageSize;
private volatile AddressFullMessagePolicy addressFullMessagePolicy;
@@ -304,6 +307,8 @@ public class PagingStoreImpl implements PagingStore {
pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
+ diskFullMessagePolicy = addressSettings.getDiskFullMessagePolicy();
+
pageLimitBytes = addressSettings.getPageLimitBytes();
if (pageLimitBytes != null && pageLimitBytes < 0) {
@@ -1292,47 +1297,76 @@ public class PagingStoreImpl implements PagingStore {
return false;
}
- if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL &&
(maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize ||
pagingManager.isDiskFull())) {
- if (isFull()) {
- if (runOnFailure && runWhenAvailable != null) {
+ if (pagingManager.isDiskFull()) {
+ if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
+ if (runOnFailure) {
addToBlockList(runWhenAvailable, blockedCallback);
pagingManager.addBlockedStore(this);
}
return false;
}
- } else if (pagingManager.isDiskFull() || addressFullMessagePolicy ==
AddressFullMessagePolicy.BLOCK && (maxMessages != -1 || maxSize != -1 ||
usingGlobalMaxSize)) {
- if (pagingManager.isDiskFull() || this.full ||
pagingManager.isGlobalFull()) {
+
+ if (diskFullMessagePolicy == null || diskFullMessagePolicy ==
DiskFullMessagePolicy.BLOCK) {
if (runWhenBlocking != null) {
runWhenBlocking.run();
}
addToBlockList(runWhenAvailable, blockedCallback);
- // We check again to avoid a race condition where the size can
come down just after the element
- // has been added, but the check to execute was done before the
element was added
- // NOTE! We do not fix this race by locking the whole thing, doing
this check provides
- // MUCH better performance in a highly concurrent environment
- if (!pagingManager.isGlobalFull() && !full) {
- // run it now
+ // Avoid a race condition see description below
+ if (!pagingManager.isDiskFull()) {
runWhenAvailable.run();
onMemoryFreedRunnables.remove(runWhenAvailable);
} else {
- if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
- pagingManager.addBlockedStore(this);
- }
+ pagingManager.addBlockedStore(this);
if (!blocking) {
- if (pagingManager.isDiskFull()) {
- ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
- } else {
-
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, getPageInfo());
- }
+ ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
blocking = true;
}
}
return true;
}
+ } else {
+ if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL &&
(maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize)) {
+ if (isFull()) {
+ if (runOnFailure && runWhenAvailable != null) {
+ addToBlockList(runWhenAvailable, blockedCallback);
+ pagingManager.addBlockedStore(this);
+ }
+ return false;
+ }
+ } else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK
&& (maxMessages != -1 || maxSize != -1 || usingGlobalMaxSize)) {
+ if (this.full || pagingManager.isGlobalFull()) {
+ if (runWhenBlocking != null) {
+ runWhenBlocking.run();
+ }
+
+ addToBlockList(runWhenAvailable, blockedCallback);
+
+ // We check again to avoid a race condition where the size can
come down just after the element
+ // has been added, but the check to execute was done before the
element was added
+ // NOTE! We do not fix this race by locking the whole thing,
doing this check provides
+ // MUCH better performance in a highly concurrent environment
+ if (!pagingManager.isGlobalFull() && !full) {
+ // run it now
+ runWhenAvailable.run();
+ onMemoryFreedRunnables.remove(runWhenAvailable);
+ } else {
+ if (usingGlobalMaxSize) {
+ pagingManager.addBlockedStore(this);
+ }
+
+ if (!blocking) {
+
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, getPageInfo());
+ blocking = true;
+ }
+ }
+
+ return true;
+ }
+ }
}
if (runWhenAvailable != null) {
@@ -1400,6 +1434,28 @@ public class PagingStoreImpl implements PagingStore {
return -1;
}
+ boolean diskFull = pagingManager.isDiskFull();
+
+ if (diskFullMessagePolicy == DiskFullMessagePolicy.DROP ||
diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
+ if (diskFull) {
+ if (message.isLargeMessage()) {
+ ((LargeServerMessage) message).deleteFile();
+ }
+
+ if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
+ throw
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+ }
+
+ // Dist is full, just drop the data
+ if (!printedDropMessagesWarning) {
+ printedDropMessagesWarning = true;
+ ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName,
getPageInfo());
+ }
+
+ return 0;
+ }
+ }
+
boolean full = isFull();
if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP ||
addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
@@ -1444,9 +1500,7 @@ public class PagingStoreImpl implements PagingStore {
return 0;
}
- int creditsUsed = writePage(message, tx, listCtx, pageDecorator,
useFlowControl);
-
- return creditsUsed;
+ return writePage(message, tx, listCtx, pageDecorator, useFlowControl);
}
private int writePage(Message message,
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 4a81abaaf2..cbe1417f9d 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -538,4 +538,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229258, value = "Invalid cluster bridge message! No queue IDs
defined in the property {}")
ActiveMQIllegalStateException noQueueIdsDefined(SimpleString idsHeaderName);
+ @Message(id = 229259, value = "Invalid disk full message policy type {}")
+ IllegalArgumentException invalidDiskFullPolicyType(String val);
+
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 97e854665e..1189d2849b 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -193,6 +193,11 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
}
private PageFullMessagePolicy pageFullMessagePolicy = null;
+ static {
+ metaBean.add(DiskFullMessagePolicy.class, "diskFullMessagePolicy", (t,
p) -> t.diskFullMessagePolicy = p, t -> t.diskFullMessagePolicy);
+ }
+ private DiskFullMessagePolicy diskFullMessagePolicy = null;
+
static {
metaBean.add(Long.class, "maxSizeMessages", (t, p) -> t.maxSizeMessages
= p, t -> t.maxSizeMessages);
}
@@ -910,6 +915,15 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
return this;
}
+ public DiskFullMessagePolicy getDiskFullMessagePolicy() {
+ return diskFullMessagePolicy;
+ }
+
+ public AddressSettings setDiskFullMessagePolicy(DiskFullMessagePolicy
policy) {
+ this.diskFullMessagePolicy = policy;
+ return this;
+ }
+
public int getMaxReadPageBytes() {
return Objects.requireNonNullElse(maxReadPageBytes, 2 *
getPageSizeBytes());
}
@@ -1635,6 +1649,7 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
Objects.equals(pageLimitBytes, other.pageLimitBytes) &&
Objects.equals(pageLimitMessages, other.pageLimitMessages) &&
Objects.equals(pageFullMessagePolicy,
other.pageFullMessagePolicy) &&
+ Objects.equals(diskFullMessagePolicy,
other.diskFullMessagePolicy) &&
Objects.equals(maxSizeMessages, other.maxSizeMessages) &&
Objects.equals(pageSizeBytes, other.pageSizeBytes) &&
Objects.equals(pageCacheMaxSize, other.pageCacheMaxSize) &&
@@ -1710,10 +1725,10 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
public int hashCode() {
return Objects.hash(addressFullMessagePolicy, maxSizeBytes,
maxReadPageBytes, maxReadPageMessages,
prefetchPageBytes, prefetchPageMessages,
pageLimitBytes, pageLimitMessages,
- pageFullMessagePolicy, maxSizeMessages,
pageSizeBytes, pageCacheMaxSize, maxDeliveryAttempts,
- messageCounterHistoryDayLimit, redeliveryDelay,
redeliveryMultiplier,
- redeliveryCollisionAvoidanceFactor,
maxRedeliveryDelay, deadLetterAddress, expiryAddress,
- expiryDelay, minExpiryDelay, maxExpiryDelay,
noExpiry, defaultLastValueQueue,
+ pageFullMessagePolicy, diskFullMessagePolicy,
maxSizeMessages, pageSizeBytes, pageCacheMaxSize,
+ maxDeliveryAttempts, messageCounterHistoryDayLimit,
redeliveryDelay,
+ redeliveryMultiplier,
redeliveryCollisionAvoidanceFactor, maxRedeliveryDelay, deadLetterAddress,
+ expiryAddress, expiryDelay, minExpiryDelay,
maxExpiryDelay, noExpiry, defaultLastValueQueue,
defaultLastValueKey, defaultNonDestructive,
defaultExclusiveQueue, defaultGroupRebalance,
defaultGroupRebalancePauseDispatch,
defaultGroupBuckets, defaultGroupFirstKey,
redistributionDelay, sendToDLAOnNoRoute,
slowConsumerThreshold,
@@ -1734,7 +1749,7 @@ public class AddressSettings implements
Mergeable<AddressSettings>, Serializable
@Override
public String toString() {
- return "AddressSettings{" + "addressFullMessagePolicy=" +
addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ",
maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" +
maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ",
prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" +
pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ",
pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" +
maxSizeMessages + [...]
+ return "AddressSettings{" + "addressFullMessagePolicy=" +
addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ",
maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" +
maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ",
prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" +
pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ",
pageFullMessagePolicy=" + pageFullMessagePolicy + ", diskFullMessagePolicy=" +
diskFullMess [...]
+ '}';
}
}
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5e87d2e00f..fd9fbe07c6 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -4307,6 +4307,21 @@
</xsd:simpleType>
</xsd:element>
+ <xsd:element name="disk-full-policy" maxOccurs="1" minOccurs="0">
+ <xsd:annotation>
+ <xsd:documentation>
+ what happens when disk becomes full
+ </xsd:documentation>
+ </xsd:annotation>
+ <xsd:simpleType>
+ <xsd:restriction base="xsd:string">
+ <xsd:enumeration value="DROP"/>
+ <xsd:enumeration value="FAIL"/>
+ <xsd:enumeration value="BLOCK"/>
+ </xsd:restriction>
+ </xsd:simpleType>
+ </xsd:element>
+
<xsd:element name="page-full-policy" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 6fc6996a77..eec20a5cad 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -101,6 +101,7 @@ import
org.apache.activemq.artemis.core.server.routing.KeyType;
import
org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
import
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
@@ -1704,6 +1705,7 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
properties.put("addressSettings.NeedToSet.defaultQueueRoutingType",
RoutingType.ANYCAST);
properties.put("addressSettings.NeedToSet.autoDeleteQueuesMessageCount",
6789);
properties.put("addressSettings.NeedToSet.addressFullMessagePolicy",
AddressFullMessagePolicy.DROP);
+ properties.put("addressSettings.NeedToSet.diskFullMessagePolicy",
DiskFullMessagePolicy.DROP);
properties.put("addressSettings.NeedToSet.maxSizeBytes", 6666);
properties.put("addressSettings.NeedToSet.redistributionDelay", 22);
properties.put("addressSettings.NeedToSet.maxSizeBytesRejectThreshold",
12334);
@@ -1773,6 +1775,7 @@ public class ConfigurationImplTest extends
AbstractConfigurationTestBase {
assertEquals(RoutingType.ANYCAST,
configuration.getAddressSettings().get("NeedToSet").getDefaultQueueRoutingType());
assertEquals(6789,
configuration.getAddressSettings().get("NeedToSet").getAutoDeleteQueuesMessageCount());
assertEquals(AddressFullMessagePolicy.DROP,
configuration.getAddressSettings().get("NeedToSet").getAddressFullMessagePolicy());
+ assertEquals(DiskFullMessagePolicy.DROP,
configuration.getAddressSettings().get("NeedToSet").getDiskFullMessagePolicy());
assertEquals(6666,
configuration.getAddressSettings().get("NeedToSet").getMaxSizeBytes());
assertEquals(22,
configuration.getAddressSettings().get("NeedToSet").getRedistributionDelay());
assertEquals(12334,
configuration.getAddressSettings().get("NeedToSet").getMaxSizeBytesRejectThreshold());
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 5461097752..53b07421d7 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -48,6 +48,7 @@ import
org.apache.activemq.artemis.core.config.ha.SharedStorePrimaryPolicyConfig
import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
import org.apache.activemq.artemis.tests.util.ServerTestBase;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
@@ -1068,4 +1069,23 @@ public class FileConfigurationParserTest extends
ServerTestBase {
assertEquals("#", match.getAddressMatch());
assertEquals("myQueue", match.getQueueMatch());
}
+
+ @Test
+ public void testParsingDiskFullPolicy() throws Exception {
+ String configStr = """
+ <configuration>
+ <address-settings>
+ <address-setting match="foo">
+ <disk-full-policy>FAIL</disk-full-policy>
+ </address-setting>
+ </address-settings>
+ </configuration>""";
+
+ FileConfigurationParser parser = new FileConfigurationParser();
+ ByteArrayInputStream input = new
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+ Configuration configuration = parser.parseMainConfig(input);
+ AddressSettings settings = configuration.getAddressSettings().get("foo");
+ assertEquals(DiskFullMessagePolicy.FAIL,
settings.getDiskFullMessagePolicy());
+ }
}
diff --git
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index b59bc1ccbb..43cf9d1896 100644
---
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -80,6 +80,7 @@ import
org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlug
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
import
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
@@ -519,6 +520,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertEquals(2L, (long)
configInstance.getAddressSettings().get("a1").getMinExpiryDelay());
assertEquals(3L, (long)
configInstance.getAddressSettings().get("a1").getMaxExpiryDelay());
assertTrue(configInstance.getAddressSettings().get("a1").isNoExpiry());
+ assertEquals(DiskFullMessagePolicy.DROP,
configInstance.getAddressSettings().get("a1").getDiskFullMessagePolicy());
assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_EXPIRY_RESOURCES,
configInstance.getAddressSettings().get("a1").isAutoCreateExpiryResources());
assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX,
configInstance.getAddressSettings().get("a1").getExpiryQueuePrefix());
assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX,
configInstance.getAddressSettings().get("a1").getExpiryQueueSuffix());
@@ -561,6 +563,7 @@ public class FileConfigurationTest extends
AbstractConfigurationTestBase {
assertEquals(-1L, (long)
configInstance.getAddressSettings().get("a2").getMinExpiryDelay());
assertEquals(-1L, (long)
configInstance.getAddressSettings().get("a2").getMaxExpiryDelay());
assertFalse(configInstance.getAddressSettings().get("a2").isNoExpiry());
+
assertNull(configInstance.getAddressSettings().get("a2").getDiskFullMessagePolicy());
assertTrue(configInstance.getAddressSettings().get("a2").isAutoCreateDeadLetterResources());
assertEquals("",
configInstance.getAddressSettings().get("a2").getExpiryQueuePrefix().toString());
assertEquals(".EXP",
configInstance.getAddressSettings().get("a2").getExpiryQueueSuffix().toString());
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index b9935e85bb..14b76645c8 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -619,6 +619,7 @@
<min-expiry-delay>2</min-expiry-delay>
<max-expiry-delay>3</max-expiry-delay>
<no-expiry>true</no-expiry>
+ <disk-full-policy>DROP</disk-full-policy>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 217e42dba2..eb4e3d4577 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -22,6 +22,7 @@
<min-expiry-delay>2</min-expiry-delay>
<max-expiry-delay>3</max-expiry-delay>
<no-expiry>true</no-expiry>
+ <disk-full-policy>DROP</disk-full-policy>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
diff --git
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
index 217e42dba2..eb4e3d4577 100644
---
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
+++
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
@@ -22,6 +22,7 @@
<min-expiry-delay>2</min-expiry-delay>
<max-expiry-delay>3</max-expiry-delay>
<no-expiry>true</no-expiry>
+ <disk-full-policy>DROP</disk-full-policy>
<redelivery-delay>1</redelivery-delay>
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
<max-size-bytes>817M</max-size-bytes>
diff --git a/docs/user-manual/address-settings.adoc
b/docs/user-manual/address-settings.adoc
index cbdf27484e..5b31140596 100644
--- a/docs/user-manual/address-settings.adoc
+++ b/docs/user-manual/address-settings.adoc
@@ -46,6 +46,7 @@ Here an example of an `address-setting` entry that might be
found in the `broker
<max-size-bytes-reject-threshold>-1</max-size-bytes-reject-threshold>
<page-size-bytes>10MB</page-size-bytes>
<address-full-policy>PAGE</address-full-policy>
+ <disk-full-policy>BLOCK</disk-full-policy>
<message-counter-history-day-limit></message-counter-history-day-limit>
<last-value-queue>false</last-value-queue> <!-- deprecated! see
default-last-value-queue -->
<default-last-value-queue>false</default-last-value-queue>
@@ -188,6 +189,15 @@ If the value is `FAIL` then further messages will be
dropped and an exception wi
If the value is `BLOCK` then client message producers will block when they try
and send further messages.
See the xref:flow-control.adoc#flow-control[Flow Control] and
xref:paging.adoc#paging[Paging] chapters for more info.
+disk-full-policy::
+This attribute can have one of the following values: `DROP`, `FAIL` or `BLOCK`
and determines what happens when a disk becomes full.
+The default value is `BLOCK`.
+If the value is `DROP` then further messages will be silently dropped.
+If the value is `FAIL` then further messages will be dropped and an exception
will be thrown on the client-side.
+If the value is `BLOCK` then client message producers will block when they try
and send further messages.
+See the xref:flow-control.adoc#flow-control[Flow Control],
xref:paging.adoc#paging[Paging] and xref:paging.adoc#monitoring-disk[Monitoring
Disk] chapters for more info.
+
+
message-counter-history-day-limit::
is the number of days to keep message counter history for this address
assuming that `message-counter-enabled` is `true`.
Default is `0`.
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/DiskFullSendPagingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/DiskFullSendPagingTest.java
new file mode 100644
index 0000000000..cd21f6d768
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/DiskFullSendPagingTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DiskFullSendPagingTest extends ActiveMQTestBase {
+
+ @Test
+ public void testSendMessagesOnDiskFullWithDropPolicy() throws Exception {
+ ActiveMQServerImpl server =
createServerWithDiskFullPolicy(DiskFullMessagePolicy.DROP);
+
+ server.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
+ server.getMonitor().tick();
+
+ CountDownLatch done = new CountDownLatch(1);
+ AtomicInteger errors = new AtomicInteger();
+
+ String address = getTestMethodName();
+
+ Thread t = new Thread(() -> {
+ try {
+ trySendMessage(address);
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+
+ t.start();
+
+ assertTrue(done.await(200, TimeUnit.MILLISECONDS));
+ assertEquals(0, errors.get());
+ }
+
+ @Test
+ public void testSendMessagesOnDiskFullWithFailPolicy() throws Exception {
+ ActiveMQServerImpl server =
createServerWithDiskFullPolicy(DiskFullMessagePolicy.FAIL);
+
+ server.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
+ server.getMonitor().tick();
+
+ CountDownLatch done = new CountDownLatch(1);
+ AtomicInteger errors = new AtomicInteger();
+
+ String address = getTestMethodName();
+
+ Thread t = new Thread(() -> {
+ try {
+ trySendMessage(address);
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+
+ t.start();
+
+ assertTrue(done.await(200, TimeUnit.MILLISECONDS));
+ assertEquals(1, errors.get());
+ }
+
+ @Test
+ public void testSendMessagesOnDiskFullWithBlockPolicy() throws Exception {
+ ActiveMQServerImpl server =
createServerWithDiskFullPolicy(DiskFullMessagePolicy.BLOCK);
+
+ server.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
+ server.getMonitor().tick();
+
+ CountDownLatch done = new CountDownLatch(1);
+ AtomicInteger errors = new AtomicInteger();
+
+ String address = getTestMethodName();
+
+ Thread t = new Thread(() -> {
+ try {
+ trySendMessage(address);
+ } catch (Exception e) {
+ errors.incrementAndGet();
+ } finally {
+ done.countDown();
+ }
+ });
+
+ t.start();
+
+ assertFalse(done.await(200, TimeUnit.MILLISECONDS));
+ assertEquals(0, errors.get());
+
+ server.getMonitor().setMaxUsage(1); // release the disk
+ server.getMonitor().tick();
+
+ assertTrue(done.await(200, TimeUnit.MILLISECONDS));
+ assertEquals(0, errors.get());
+ }
+
+ private ActiveMQServerImpl
createServerWithDiskFullPolicy(DiskFullMessagePolicy policy) throws Exception {
+ ActiveMQServerImpl server = (ActiveMQServerImpl) createServer(true);
+ server.getAddressSettingsRepository().addMatch("#", new
AddressSettings().setDiskFullMessagePolicy(policy));
+ server.start();
+
+ waitForServerToStart(server);
+ return server;
+ }
+
+ private void trySendMessage(String address) throws Exception {
+ try (ClientSessionFactory factory =
createSessionFactory(createFactory(false))) {
+ try (ClientSession session = factory.createSession()) {
+ session.createQueue(QueueConfiguration.of(address));
+
+ ClientProducer producer = session.createProducer(address);
+
+ producer.send(createBytesMessage(session,
ActiveMQBytesMessage.TYPE, new byte[1024], false));
+ }
+ }
+ }
+}
diff --git
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 1e5c1d4ef6..78baaca17e 100644
---
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -63,6 +63,7 @@ import
org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
@@ -1095,6 +1096,65 @@ public class PagingStoreImplTest extends
ActiveMQTestBase {
}
}
+ @Test
+ public void testCheckMemoryOnDiskFull() throws Exception {
+ SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+ PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+ PagingManager mockManager = Mockito.mock(FakePagingManager.class);
+
+ ArtemisExecutor sameThreadExecutor = Runnable::run;
+ PagingStoreImpl store = new
PagingStoreImpl(PagingStoreImplTest.destinationTestName,
scheduledExecutorService, 100,
+ mockManager, nullStorageManager, factory, storeFactory,
+ PagingStoreImplTest.destinationTestName,
+ new AddressSettings(),
+ sameThreadExecutor, true);
+
+ store.start();
+ try {
+ // isDiskFull called twice when no disk full message policy is set or
when disk full message policy is BLOCK
+ Mockito.when(mockManager.isDiskFull()).thenReturn(
+ true, true, // null
+ true, true, // BLOCK
+ true, // FAIL
+ true, // DROP
+ false
+ );
+
+ CountingRunnable trackMemoryCheck = new CountingRunnable();
+
+ // No disk full policy
+ assertTrue(store.checkMemory(trackMemoryCheck, null));
+ assertEquals(0, trackMemoryCheck.getCount());
+
+ // Disk full policy BLOCK
+ store.applySetting(new
AddressSettings().setDiskFullMessagePolicy(DiskFullMessagePolicy.BLOCK));
+
+ assertTrue(store.checkMemory(trackMemoryCheck, null));
+ assertEquals(0, trackMemoryCheck.getCount());
+
+ // Disk full policy FAIL
+ store.applySetting(new
AddressSettings().setDiskFullMessagePolicy(DiskFullMessagePolicy.FAIL));
+
+ assertFalse(store.checkMemory(trackMemoryCheck, null));
+ assertEquals(0, trackMemoryCheck.getCount());
+
+ // Disk full policy DROP
+ store.applySetting(new
AddressSettings().setDiskFullMessagePolicy(DiskFullMessagePolicy.DROP));
+
+ assertTrue(store.checkMemory(trackMemoryCheck, null));
+ assertEquals(1, trackMemoryCheck.getCount());
+
+ // Release blocks
+ assertTrue(store.checkReleasedMemory());
+ assertEquals(4, trackMemoryCheck.getCount());
+
+ } finally {
+ store.stop();
+ }
+ }
+
protected PagingManager createMockManager() {
return new FakePagingManager();
}
@@ -1263,7 +1323,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase
{
// Do another check, this time indicate the disk is full during the
first couple
// requests, making the task initially be retained for later but then
executed.
final CountingRunnable trackMemoryCheck2 = new CountingRunnable();
- Mockito.when(mockManager.isDiskFull()).thenReturn(true, true, false);
+ Mockito.when(mockManager.isDiskFull()).thenReturn(true, false);
assertEquals(0, trackMemoryCheck2.getCount());
store.checkMemory(trackMemoryCheck2, null);
assertEquals(1, trackMemoryCheck2.getCount());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact