This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c0a1f3da0 ARTEMIS-5750 Implement disk-full-policy address setting
6c0a1f3da0 is described below

commit 6c0a1f3da0b2d592424859a763f3dc37b2fc4952
Author: iliya <[email protected]>
AuthorDate: Fri Nov 14 17:13:18 2025 +0300

    ARTEMIS-5750 Implement disk-full-policy address setting
---
 .../core/settings/impl/DiskFullMessagePolicy.java  |  21 +++
 .../artemis/core/config/impl/Validators.java       |  10 ++
 .../deployers/impl/FileConfigurationParser.java    |   6 +
 .../artemis/core/paging/impl/PagingStoreImpl.java  |  98 ++++++++++---
 .../artemis/core/server/ActiveMQMessageBundle.java |   3 +
 .../core/settings/impl/AddressSettings.java        |  25 +++-
 .../resources/schema/artemis-configuration.xsd     |  15 ++
 .../core/config/impl/ConfigurationImplTest.java    |   3 +
 .../config/impl/FileConfigurationParserTest.java   |  20 +++
 .../core/config/impl/FileConfigurationTest.java    |   3 +
 .../resources/ConfigurationTest-full-config.xml    |   1 +
 ...rationTest-xinclude-config-address-settings.xml |   1 +
 ...est-xinclude-schema-config-address-settings.xml |   1 +
 docs/user-manual/address-settings.adoc             |  10 ++
 .../integration/paging/DiskFullSendPagingTest.java | 151 +++++++++++++++++++++
 .../unit/core/paging/impl/PagingStoreImplTest.java |  62 ++++++++-
 16 files changed, 402 insertions(+), 28 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/DiskFullMessagePolicy.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/DiskFullMessagePolicy.java
new file mode 100644
index 0000000000..09833505a3
--- /dev/null
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/settings/impl/DiskFullMessagePolicy.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.settings.impl;
+
+public enum DiskFullMessagePolicy {
+   DROP, BLOCK, FAIL
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
index 338dc9fdbd..1b128cdb11 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/Validators.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import 
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
@@ -148,6 +149,15 @@ public final class Validators {
       return value;
    };
 
+   public static final Validator<String> DISK_FULL_MESSAGE_POLICY_TYPE = 
(name, value) -> {
+      if (value == null || 
!value.equals(DiskFullMessagePolicy.DROP.toString()) &&
+              !value.equals(DiskFullMessagePolicy.BLOCK.toString()) &&
+              !value.equals(DiskFullMessagePolicy.FAIL.toString())) {
+         throw ActiveMQMessageBundle.BUNDLE.invalidDiskFullPolicyType(value);
+      }
+      return value;
+   };
+
    public static final Validator<String> PAGE_FULL_MESSAGE_POLICY_TYPE = 
(name, value) -> {
       if (value == null ||
          !value.equals(PageFullMessagePolicy.DROP.toString()) &&
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 6503b2931d..ebb479b0c9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -104,6 +104,7 @@ import 
org.apache.activemq.artemis.core.server.routing.policies.PolicyFactoryRes
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
@@ -128,6 +129,7 @@ import org.w3c.dom.NodeList;
 import static 
org.apache.activemq.artemis.core.config.impl.Validators.ADDRESS_FULL_MESSAGE_POLICY_TYPE;
 import static 
org.apache.activemq.artemis.core.config.impl.Validators.COMPONENT_ROUTING_TYPE;
 import static 
org.apache.activemq.artemis.core.config.impl.Validators.DELETION_POLICY_TYPE;
+import static 
org.apache.activemq.artemis.core.config.impl.Validators.DISK_FULL_MESSAGE_POLICY_TYPE;
 import static org.apache.activemq.artemis.core.config.impl.Validators.GE_ZERO;
 import static org.apache.activemq.artemis.core.config.impl.Validators.GT_ZERO;
 import static 
org.apache.activemq.artemis.core.config.impl.Validators.JOURNAL_TYPE;
@@ -259,6 +261,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
 
    private static final String PAGE_FULL_MESSAGE_POLICY_NODE_NAME = 
"page-full-policy";
 
+   private static final String DISK_FULL_MESSAGE_POLICY_NODE_NAME = 
"disk-full-policy";
+
    private static final String MAX_READ_PAGE_BYTES_NODE_NAME = 
"max-read-page-bytes";
 
    private static final String PREFETCH_PAGE_BYTES_NODE_NAME = 
"prefetch-page-bytes";
@@ -1368,6 +1372,8 @@ public final class FileConfigurationParser extends 
XMLConfigurationUtil {
             
addressSettings.setMessageCounterHistoryDayLimit(XMLUtil.parseInt(child));
          } else if 
(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) {
             
addressSettings.setAddressFullMessagePolicy(Enum.valueOf(AddressFullMessagePolicy.class,
 
ADDRESS_FULL_MESSAGE_POLICY_TYPE.validate(ADDRESS_FULL_MESSAGE_POLICY_NODE_NAME,
 getTrimmedTextContent(child))));
+         } else if (DISK_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) 
{
+            
addressSettings.setDiskFullMessagePolicy(Enum.valueOf(DiskFullMessagePolicy.class,
 DISK_FULL_MESSAGE_POLICY_TYPE.validate(DISK_FULL_MESSAGE_POLICY_NODE_NAME, 
getTrimmedTextContent(child))));
          } else if (PAGE_FULL_MESSAGE_POLICY_NODE_NAME.equalsIgnoreCase(name)) 
{
             
addressSettings.setPageFullMessagePolicy(Enum.valueOf(PageFullMessagePolicy.class,
 PAGE_FULL_MESSAGE_POLICY_TYPE.validate(PAGE_FULL_MESSAGE_POLICY_NODE_NAME, 
getTrimmedTextContent(child))));
          } else if (LVQ_NODE_NAME.equalsIgnoreCase(name) || 
DEFAULT_LVQ_NODE_NAME.equalsIgnoreCase(name)) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 0c1092d142..33b0bfe891 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -56,6 +56,7 @@ import 
org.apache.activemq.artemis.core.server.RouteContextList;
 import org.apache.activemq.artemis.core.server.impl.MessageReferenceImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.TransactionOperation;
@@ -122,6 +123,8 @@ public class PagingStoreImpl implements PagingStore {
 
    private PageFullMessagePolicy pageFullMessagePolicy;
 
+   private DiskFullMessagePolicy diskFullMessagePolicy;
+
    private int pageSize;
 
    private volatile AddressFullMessagePolicy addressFullMessagePolicy;
@@ -304,6 +307,8 @@ public class PagingStoreImpl implements PagingStore {
 
       pageFullMessagePolicy = addressSettings.getPageFullMessagePolicy();
 
+      diskFullMessagePolicy = addressSettings.getDiskFullMessagePolicy();
+
       pageLimitBytes = addressSettings.getPageLimitBytes();
 
       if (pageLimitBytes != null && pageLimitBytes < 0) {
@@ -1292,47 +1297,76 @@ public class PagingStoreImpl implements PagingStore {
          return false;
       }
 
-      if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && 
(maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize || 
pagingManager.isDiskFull())) {
-         if (isFull()) {
-            if (runOnFailure && runWhenAvailable != null) {
+      if (pagingManager.isDiskFull()) {
+         if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
+            if (runOnFailure) {
                addToBlockList(runWhenAvailable, blockedCallback);
                pagingManager.addBlockedStore(this);
             }
             return false;
          }
-      } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == 
AddressFullMessagePolicy.BLOCK && (maxMessages != -1 || maxSize != -1 || 
usingGlobalMaxSize)) {
-         if (pagingManager.isDiskFull() || this.full || 
pagingManager.isGlobalFull()) {
+
+         if (diskFullMessagePolicy == null || diskFullMessagePolicy == 
DiskFullMessagePolicy.BLOCK) {
             if (runWhenBlocking != null) {
                runWhenBlocking.run();
             }
 
             addToBlockList(runWhenAvailable, blockedCallback);
 
-            // We check again to avoid a race condition where the size can 
come down just after the element
-            // has been added, but the check to execute was done before the 
element was added
-            // NOTE! We do not fix this race by locking the whole thing, doing 
this check provides
-            // MUCH better performance in a highly concurrent environment
-            if (!pagingManager.isGlobalFull() && !full) {
-               // run it now
+            // Avoid a race condition see description below
+            if (!pagingManager.isDiskFull()) {
                runWhenAvailable.run();
                onMemoryFreedRunnables.remove(runWhenAvailable);
             } else {
-               if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
-                  pagingManager.addBlockedStore(this);
-               }
+               pagingManager.addBlockedStore(this);
 
                if (!blocking) {
-                  if (pagingManager.isDiskFull()) {
-                     ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
-                  } else {
-                     
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, getPageInfo());
-                  }
+                  ActiveMQServerLogger.LOGGER.blockingDiskFull(address);
                   blocking = true;
                }
             }
 
             return true;
          }
+      } else {
+         if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && 
(maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize)) {
+            if (isFull()) {
+               if (runOnFailure && runWhenAvailable != null) {
+                  addToBlockList(runWhenAvailable, blockedCallback);
+                  pagingManager.addBlockedStore(this);
+               }
+               return false;
+            }
+         } else if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK 
&& (maxMessages != -1 || maxSize != -1 || usingGlobalMaxSize)) {
+            if (this.full || pagingManager.isGlobalFull()) {
+               if (runWhenBlocking != null) {
+                  runWhenBlocking.run();
+               }
+
+               addToBlockList(runWhenAvailable, blockedCallback);
+
+               // We check again to avoid a race condition where the size can 
come down just after the element
+               // has been added, but the check to execute was done before the 
element was added
+               // NOTE! We do not fix this race by locking the whole thing, 
doing this check provides
+               // MUCH better performance in a highly concurrent environment
+               if (!pagingManager.isGlobalFull() && !full) {
+                  // run it now
+                  runWhenAvailable.run();
+                  onMemoryFreedRunnables.remove(runWhenAvailable);
+               } else {
+                  if (usingGlobalMaxSize) {
+                     pagingManager.addBlockedStore(this);
+                  }
+
+                  if (!blocking) {
+                     
ActiveMQServerLogger.LOGGER.blockingMessageProduction(address, getPageInfo());
+                     blocking = true;
+                  }
+               }
+
+               return true;
+            }
+         }
       }
 
       if (runWhenAvailable != null) {
@@ -1400,6 +1434,28 @@ public class PagingStoreImpl implements PagingStore {
          return -1;
       }
 
+      boolean diskFull = pagingManager.isDiskFull();
+
+      if (diskFullMessagePolicy == DiskFullMessagePolicy.DROP || 
diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
+         if (diskFull) {
+            if (message.isLargeMessage()) {
+               ((LargeServerMessage) message).deleteFile();
+            }
+
+            if (diskFullMessagePolicy == DiskFullMessagePolicy.FAIL) {
+               throw 
ActiveMQMessageBundle.BUNDLE.addressIsFull(address.toString());
+            }
+
+            // Dist is full, just drop the data
+            if (!printedDropMessagesWarning) {
+               printedDropMessagesWarning = true;
+               ActiveMQServerLogger.LOGGER.pageStoreDropMessages(storeName, 
getPageInfo());
+            }
+
+            return 0;
+         }
+      }
+
       boolean full = isFull();
 
       if (addressFullMessagePolicy == AddressFullMessagePolicy.DROP || 
addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
@@ -1444,9 +1500,7 @@ public class PagingStoreImpl implements PagingStore {
          return 0;
       }
 
-      int creditsUsed = writePage(message, tx, listCtx, pageDecorator, 
useFlowControl);
-
-      return creditsUsed;
+      return writePage(message, tx, listCtx, pageDecorator, useFlowControl);
    }
 
    private int writePage(Message message,
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 4a81abaaf2..cbe1417f9d 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -538,4 +538,7 @@ public interface ActiveMQMessageBundle {
    @Message(id = 229258, value = "Invalid cluster bridge message! No queue IDs 
defined in the property {}")
    ActiveMQIllegalStateException noQueueIdsDefined(SimpleString idsHeaderName);
 
+   @Message(id = 229259, value = "Invalid disk full message policy type {}")
+   IllegalArgumentException invalidDiskFullPolicyType(String val);
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 97e854665e..1189d2849b 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -193,6 +193,11 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
    }
    private PageFullMessagePolicy pageFullMessagePolicy = null;
 
+   static {
+      metaBean.add(DiskFullMessagePolicy.class, "diskFullMessagePolicy", (t, 
p) -> t.diskFullMessagePolicy = p, t -> t.diskFullMessagePolicy);
+   }
+   private DiskFullMessagePolicy diskFullMessagePolicy = null;
+
    static {
       metaBean.add(Long.class, "maxSizeMessages", (t, p) -> t.maxSizeMessages 
= p, t -> t.maxSizeMessages);
    }
@@ -910,6 +915,15 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public DiskFullMessagePolicy getDiskFullMessagePolicy() {
+      return diskFullMessagePolicy;
+   }
+
+   public AddressSettings setDiskFullMessagePolicy(DiskFullMessagePolicy 
policy) {
+      this.diskFullMessagePolicy = policy;
+      return this;
+   }
+
    public int getMaxReadPageBytes() {
       return Objects.requireNonNullElse(maxReadPageBytes, 2 * 
getPageSizeBytes());
    }
@@ -1635,6 +1649,7 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
              Objects.equals(pageLimitBytes, other.pageLimitBytes) &&
              Objects.equals(pageLimitMessages, other.pageLimitMessages) &&
              Objects.equals(pageFullMessagePolicy, 
other.pageFullMessagePolicy) &&
+             Objects.equals(diskFullMessagePolicy, 
other.diskFullMessagePolicy) &&
              Objects.equals(maxSizeMessages, other.maxSizeMessages) &&
              Objects.equals(pageSizeBytes, other.pageSizeBytes) &&
              Objects.equals(pageCacheMaxSize, other.pageCacheMaxSize) &&
@@ -1710,10 +1725,10 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
    public int hashCode() {
       return Objects.hash(addressFullMessagePolicy, maxSizeBytes, 
maxReadPageBytes, maxReadPageMessages,
                           prefetchPageBytes, prefetchPageMessages, 
pageLimitBytes, pageLimitMessages,
-                          pageFullMessagePolicy, maxSizeMessages, 
pageSizeBytes, pageCacheMaxSize, maxDeliveryAttempts,
-                          messageCounterHistoryDayLimit, redeliveryDelay, 
redeliveryMultiplier,
-                          redeliveryCollisionAvoidanceFactor, 
maxRedeliveryDelay, deadLetterAddress, expiryAddress,
-                          expiryDelay, minExpiryDelay, maxExpiryDelay, 
noExpiry, defaultLastValueQueue,
+                          pageFullMessagePolicy, diskFullMessagePolicy, 
maxSizeMessages, pageSizeBytes, pageCacheMaxSize,
+                          maxDeliveryAttempts, messageCounterHistoryDayLimit, 
redeliveryDelay,
+                          redeliveryMultiplier, 
redeliveryCollisionAvoidanceFactor, maxRedeliveryDelay, deadLetterAddress,
+                          expiryAddress, expiryDelay, minExpiryDelay, 
maxExpiryDelay, noExpiry, defaultLastValueQueue,
                           defaultLastValueKey, defaultNonDestructive, 
defaultExclusiveQueue, defaultGroupRebalance,
                           defaultGroupRebalancePauseDispatch, 
defaultGroupBuckets, defaultGroupFirstKey,
                           redistributionDelay, sendToDLAOnNoRoute, 
slowConsumerThreshold,
@@ -1734,7 +1749,7 @@ public class AddressSettings implements 
Mergeable<AddressSettings>, Serializable
 
    @Override
    public String toString() {
-      return "AddressSettings{" + "addressFullMessagePolicy=" + 
addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", 
maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + 
maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", 
prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + 
pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", 
pageFullMessagePolicy=" + pageFullMessagePolicy + ", maxSizeMessages=" + 
maxSizeMessages +  [...]
+      return "AddressSettings{" + "addressFullMessagePolicy=" + 
addressFullMessagePolicy + ", maxSizeBytes=" + maxSizeBytes + ", 
maxReadPageBytes=" + maxReadPageBytes + ", maxReadPageMessages=" + 
maxReadPageMessages + ", prefetchPageBytes=" + prefetchPageBytes + ", 
prefetchPageMessages=" + prefetchPageMessages + ", pageLimitBytes=" + 
pageLimitBytes + ", pageLimitMessages=" + pageLimitMessages + ", 
pageFullMessagePolicy=" + pageFullMessagePolicy + ", diskFullMessagePolicy=" + 
diskFullMess [...]
              + '}';
    }
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd 
b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 5e87d2e00f..fd9fbe07c6 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -4307,6 +4307,21 @@
             </xsd:simpleType>
          </xsd:element>
 
+         <xsd:element name="disk-full-policy" maxOccurs="1" minOccurs="0">
+            <xsd:annotation>
+               <xsd:documentation>
+                  what happens when disk becomes full
+               </xsd:documentation>
+            </xsd:annotation>
+            <xsd:simpleType>
+               <xsd:restriction base="xsd:string">
+                  <xsd:enumeration value="DROP"/>
+                  <xsd:enumeration value="FAIL"/>
+                  <xsd:enumeration value="BLOCK"/>
+               </xsd:restriction>
+            </xsd:simpleType>
+         </xsd:element>
+
          <xsd:element name="page-full-policy" maxOccurs="1" minOccurs="0">
             <xsd:annotation>
                <xsd:documentation>
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
index 6fc6996a77..eec20a5cad 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImplTest.java
@@ -101,6 +101,7 @@ import 
org.apache.activemq.artemis.core.server.routing.KeyType;
 import 
org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.DeletionPolicy;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.ResourceLimitSettings;
 import 
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
 import org.apache.activemq.artemis.jdbc.store.drivers.JDBCDataSourceUtils;
@@ -1704,6 +1705,7 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       properties.put("addressSettings.NeedToSet.defaultQueueRoutingType", 
RoutingType.ANYCAST);
       properties.put("addressSettings.NeedToSet.autoDeleteQueuesMessageCount", 
6789);
       properties.put("addressSettings.NeedToSet.addressFullMessagePolicy", 
AddressFullMessagePolicy.DROP);
+      properties.put("addressSettings.NeedToSet.diskFullMessagePolicy", 
DiskFullMessagePolicy.DROP);
       properties.put("addressSettings.NeedToSet.maxSizeBytes", 6666);
       properties.put("addressSettings.NeedToSet.redistributionDelay", 22);
       properties.put("addressSettings.NeedToSet.maxSizeBytesRejectThreshold", 
12334);
@@ -1773,6 +1775,7 @@ public class ConfigurationImplTest extends 
AbstractConfigurationTestBase {
       assertEquals(RoutingType.ANYCAST, 
configuration.getAddressSettings().get("NeedToSet").getDefaultQueueRoutingType());
       assertEquals(6789, 
configuration.getAddressSettings().get("NeedToSet").getAutoDeleteQueuesMessageCount());
       assertEquals(AddressFullMessagePolicy.DROP, 
configuration.getAddressSettings().get("NeedToSet").getAddressFullMessagePolicy());
+      assertEquals(DiskFullMessagePolicy.DROP, 
configuration.getAddressSettings().get("NeedToSet").getDiskFullMessagePolicy());
       assertEquals(6666, 
configuration.getAddressSettings().get("NeedToSet").getMaxSizeBytes());
       assertEquals(22, 
configuration.getAddressSettings().get("NeedToSet").getRedistributionDelay());
       assertEquals(12334, 
configuration.getAddressSettings().get("NeedToSet").getMaxSizeBytesRejectThreshold());
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
index 5461097752..53b07421d7 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationParserTest.java
@@ -48,6 +48,7 @@ import 
org.apache.activemq.artemis.core.config.ha.SharedStorePrimaryPolicyConfig
 import org.apache.activemq.artemis.core.deployers.impl.FileConfigurationParser;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
 import org.apache.activemq.artemis.tests.util.ServerTestBase;
 import org.apache.activemq.artemis.utils.ClassloadingUtil;
 import org.apache.activemq.artemis.utils.DefaultSensitiveStringCodec;
@@ -1068,4 +1069,23 @@ public class FileConfigurationParserTest extends 
ServerTestBase {
       assertEquals("#", match.getAddressMatch());
       assertEquals("myQueue", match.getQueueMatch());
    }
+
+   @Test
+   public void testParsingDiskFullPolicy() throws Exception {
+      String configStr = """
+         <configuration>
+            <address-settings>
+               <address-setting match="foo">
+                  <disk-full-policy>FAIL</disk-full-policy>
+               </address-setting>
+            </address-settings>
+         </configuration>""";
+
+      FileConfigurationParser parser = new FileConfigurationParser();
+      ByteArrayInputStream input = new 
ByteArrayInputStream(configStr.getBytes(StandardCharsets.UTF_8));
+
+      Configuration configuration = parser.parseMainConfig(input);
+      AddressSettings settings = configuration.getAddressSettings().get("foo");
+      assertEquals(DiskFullMessagePolicy.FAIL, 
settings.getDiskFullMessagePolicy());
+   }
 }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index b59bc1ccbb..43cf9d1896 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -80,6 +80,7 @@ import 
org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlug
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import 
org.apache.activemq.artemis.core.settings.impl.SlowConsumerThresholdMeasurementUnit;
 import 
org.apache.activemq.artemis.tests.extensions.parameterized.ParameterizedTestExtension;
@@ -519,6 +520,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
       assertEquals(2L, (long) 
configInstance.getAddressSettings().get("a1").getMinExpiryDelay());
       assertEquals(3L, (long) 
configInstance.getAddressSettings().get("a1").getMaxExpiryDelay());
       assertTrue(configInstance.getAddressSettings().get("a1").isNoExpiry());
+      assertEquals(DiskFullMessagePolicy.DROP, 
configInstance.getAddressSettings().get("a1").getDiskFullMessagePolicy());
       assertEquals(AddressSettings.DEFAULT_AUTO_CREATE_EXPIRY_RESOURCES, 
configInstance.getAddressSettings().get("a1").isAutoCreateExpiryResources());
       assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_PREFIX, 
configInstance.getAddressSettings().get("a1").getExpiryQueuePrefix());
       assertEquals(AddressSettings.DEFAULT_EXPIRY_QUEUE_SUFFIX, 
configInstance.getAddressSettings().get("a1").getExpiryQueueSuffix());
@@ -561,6 +563,7 @@ public class FileConfigurationTest extends 
AbstractConfigurationTestBase {
       assertEquals(-1L, (long) 
configInstance.getAddressSettings().get("a2").getMinExpiryDelay());
       assertEquals(-1L, (long) 
configInstance.getAddressSettings().get("a2").getMaxExpiryDelay());
       assertFalse(configInstance.getAddressSettings().get("a2").isNoExpiry());
+      
assertNull(configInstance.getAddressSettings().get("a2").getDiskFullMessagePolicy());
       
assertTrue(configInstance.getAddressSettings().get("a2").isAutoCreateDeadLetterResources());
       assertEquals("", 
configInstance.getAddressSettings().get("a2").getExpiryQueuePrefix().toString());
       assertEquals(".EXP", 
configInstance.getAddressSettings().get("a2").getExpiryQueueSuffix().toString());
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml 
b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index b9935e85bb..14b76645c8 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -619,6 +619,7 @@
             <min-expiry-delay>2</min-expiry-delay>
             <max-expiry-delay>3</max-expiry-delay>
             <no-expiry>true</no-expiry>
+            <disk-full-policy>DROP</disk-full-policy>
             <redelivery-delay>1</redelivery-delay>
             
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
             <max-size-bytes>817M</max-size-bytes>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index 217e42dba2..eb4e3d4577 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -22,6 +22,7 @@
       <min-expiry-delay>2</min-expiry-delay>
       <max-expiry-delay>3</max-expiry-delay>
       <no-expiry>true</no-expiry>
+      <disk-full-policy>DROP</disk-full-policy>
       <redelivery-delay>1</redelivery-delay>
       
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
       <max-size-bytes>817M</max-size-bytes>
diff --git 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
index 217e42dba2..eb4e3d4577 100644
--- 
a/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
+++ 
b/artemis-server/src/test/resources/ConfigurationTest-xinclude-schema-config-address-settings.xml
@@ -22,6 +22,7 @@
       <min-expiry-delay>2</min-expiry-delay>
       <max-expiry-delay>3</max-expiry-delay>
       <no-expiry>true</no-expiry>
+      <disk-full-policy>DROP</disk-full-policy>
       <redelivery-delay>1</redelivery-delay>
       
<redelivery-collision-avoidance-factor>0.5</redelivery-collision-avoidance-factor>
       <max-size-bytes>817M</max-size-bytes>
diff --git a/docs/user-manual/address-settings.adoc 
b/docs/user-manual/address-settings.adoc
index cbdf27484e..5b31140596 100644
--- a/docs/user-manual/address-settings.adoc
+++ b/docs/user-manual/address-settings.adoc
@@ -46,6 +46,7 @@ Here an example of an `address-setting` entry that might be 
found in the `broker
       <max-size-bytes-reject-threshold>-1</max-size-bytes-reject-threshold>
       <page-size-bytes>10MB</page-size-bytes>
       <address-full-policy>PAGE</address-full-policy>
+      <disk-full-policy>BLOCK</disk-full-policy>
       <message-counter-history-day-limit></message-counter-history-day-limit>
       <last-value-queue>false</last-value-queue> <!-- deprecated! see 
default-last-value-queue -->
       <default-last-value-queue>false</default-last-value-queue>
@@ -188,6 +189,15 @@ If the value is `FAIL` then further messages will be 
dropped and an exception wi
 If the value is `BLOCK` then client message producers will block when they try 
and send further messages.
 See the xref:flow-control.adoc#flow-control[Flow Control] and 
xref:paging.adoc#paging[Paging] chapters for more info.
 
+disk-full-policy::
+This attribute can have one of the following values: `DROP`, `FAIL` or `BLOCK` 
and determines what happens when a disk becomes full.
+The default value is `BLOCK`.
+If the value is `DROP` then further messages will be silently dropped.
+If the value is `FAIL` then further messages will be dropped and an exception 
will be thrown on the client-side.
+If the value is `BLOCK` then client message producers will block when they try 
and send further messages.
+See the xref:flow-control.adoc#flow-control[Flow Control], 
xref:paging.adoc#paging[Paging] and xref:paging.adoc#monitoring-disk[Monitoring 
Disk] chapters for more info.
+
+
 message-counter-history-day-limit::
 is the number of days to keep message counter history for this address 
assuming that `message-counter-enabled` is `true`.
 Default is `0`.
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/DiskFullSendPagingTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/DiskFullSendPagingTest.java
new file mode 100644
index 0000000000..cd21f6d768
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/paging/DiskFullSendPagingTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.paging;
+
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
+import org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DiskFullSendPagingTest extends ActiveMQTestBase {
+
+   @Test
+   public void testSendMessagesOnDiskFullWithDropPolicy() throws Exception {
+      ActiveMQServerImpl server = 
createServerWithDiskFullPolicy(DiskFullMessagePolicy.DROP);
+
+      server.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
+      server.getMonitor().tick();
+
+      CountDownLatch done = new CountDownLatch(1);
+      AtomicInteger errors = new AtomicInteger();
+
+      String address = getTestMethodName();
+
+      Thread t = new Thread(() -> {
+         try {
+            trySendMessage(address);
+         } catch (Exception e) {
+            errors.incrementAndGet();
+         } finally {
+            done.countDown();
+         }
+      });
+
+      t.start();
+
+      assertTrue(done.await(200, TimeUnit.MILLISECONDS));
+      assertEquals(0, errors.get());
+   }
+
+   @Test
+   public void testSendMessagesOnDiskFullWithFailPolicy() throws Exception {
+      ActiveMQServerImpl server = 
createServerWithDiskFullPolicy(DiskFullMessagePolicy.FAIL);
+
+      server.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
+      server.getMonitor().tick();
+
+      CountDownLatch done = new CountDownLatch(1);
+      AtomicInteger errors = new AtomicInteger();
+
+      String address = getTestMethodName();
+
+      Thread t = new Thread(() -> {
+         try {
+            trySendMessage(address);
+         } catch (Exception e) {
+            errors.incrementAndGet();
+         } finally {
+            done.countDown();
+         }
+      });
+
+      t.start();
+
+      assertTrue(done.await(200, TimeUnit.MILLISECONDS));
+      assertEquals(1, errors.get());
+   }
+
+   @Test
+   public void testSendMessagesOnDiskFullWithBlockPolicy() throws Exception {
+      ActiveMQServerImpl server = 
createServerWithDiskFullPolicy(DiskFullMessagePolicy.BLOCK);
+
+      server.getMonitor().setMaxUsage(0); // forcing disk full (faking it)
+      server.getMonitor().tick();
+
+      CountDownLatch done = new CountDownLatch(1);
+      AtomicInteger errors = new AtomicInteger();
+
+      String address = getTestMethodName();
+
+      Thread t = new Thread(() -> {
+         try {
+            trySendMessage(address);
+         } catch (Exception e) {
+            errors.incrementAndGet();
+         } finally {
+            done.countDown();
+         }
+      });
+
+      t.start();
+
+      assertFalse(done.await(200, TimeUnit.MILLISECONDS));
+      assertEquals(0, errors.get());
+
+      server.getMonitor().setMaxUsage(1); // release the disk
+      server.getMonitor().tick();
+
+      assertTrue(done.await(200, TimeUnit.MILLISECONDS));
+      assertEquals(0, errors.get());
+   }
+
+   private ActiveMQServerImpl 
createServerWithDiskFullPolicy(DiskFullMessagePolicy policy) throws Exception {
+      ActiveMQServerImpl server = (ActiveMQServerImpl) createServer(true);
+      server.getAddressSettingsRepository().addMatch("#", new 
AddressSettings().setDiskFullMessagePolicy(policy));
+      server.start();
+
+      waitForServerToStart(server);
+      return server;
+   }
+
+   private void trySendMessage(String address) throws Exception {
+      try (ClientSessionFactory factory = 
createSessionFactory(createFactory(false))) {
+         try (ClientSession session = factory.createSession()) {
+            session.createQueue(QueueConfiguration.of(address));
+
+            ClientProducer producer = session.createProducer(address);
+
+            producer.send(createBytesMessage(session, 
ActiveMQBytesMessage.TYPE, new byte[1024], false));
+         }
+      }
+   }
+}
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
index 1e5c1d4ef6..78baaca17e 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java
@@ -63,6 +63,7 @@ import 
org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.DiskFullMessagePolicy;
 import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
 import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessagePersister;
 import org.apache.activemq.artemis.spi.core.protocol.MessagePersister;
@@ -1095,6 +1096,65 @@ public class PagingStoreImplTest extends 
ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testCheckMemoryOnDiskFull() throws Exception {
+      SequentialFileFactory factory = new FakeSequentialFileFactory();
+
+      PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
+
+      PagingManager mockManager = Mockito.mock(FakePagingManager.class);
+
+      ArtemisExecutor sameThreadExecutor = Runnable::run;
+      PagingStoreImpl store = new 
PagingStoreImpl(PagingStoreImplTest.destinationTestName, 
scheduledExecutorService, 100,
+              mockManager, nullStorageManager, factory, storeFactory,
+              PagingStoreImplTest.destinationTestName,
+              new AddressSettings(),
+              sameThreadExecutor, true);
+
+      store.start();
+      try {
+         // isDiskFull called twice when no disk full message policy is set or 
when disk full message policy is BLOCK
+         Mockito.when(mockManager.isDiskFull()).thenReturn(
+                 true, true, // null
+                 true, true, // BLOCK
+                 true, // FAIL
+                 true, // DROP
+                 false
+         );
+
+         CountingRunnable trackMemoryCheck = new CountingRunnable();
+
+         // No disk full policy
+         assertTrue(store.checkMemory(trackMemoryCheck, null));
+         assertEquals(0, trackMemoryCheck.getCount());
+
+         // Disk full policy BLOCK
+         store.applySetting(new 
AddressSettings().setDiskFullMessagePolicy(DiskFullMessagePolicy.BLOCK));
+
+         assertTrue(store.checkMemory(trackMemoryCheck, null));
+         assertEquals(0, trackMemoryCheck.getCount());
+
+         // Disk full policy FAIL
+         store.applySetting(new 
AddressSettings().setDiskFullMessagePolicy(DiskFullMessagePolicy.FAIL));
+
+         assertFalse(store.checkMemory(trackMemoryCheck, null));
+         assertEquals(0, trackMemoryCheck.getCount());
+
+         // Disk full policy DROP
+         store.applySetting(new 
AddressSettings().setDiskFullMessagePolicy(DiskFullMessagePolicy.DROP));
+
+         assertTrue(store.checkMemory(trackMemoryCheck, null));
+         assertEquals(1, trackMemoryCheck.getCount());
+
+         // Release blocks
+         assertTrue(store.checkReleasedMemory());
+         assertEquals(4, trackMemoryCheck.getCount());
+
+      } finally {
+         store.stop();
+      }
+   }
+
    protected PagingManager createMockManager() {
       return new FakePagingManager();
    }
@@ -1263,7 +1323,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase 
{
          // Do another check, this time indicate the disk is full during the 
first couple
          // requests, making the task initially be retained for later but then 
executed.
          final CountingRunnable trackMemoryCheck2 = new CountingRunnable();
-         Mockito.when(mockManager.isDiskFull()).thenReturn(true, true, false);
+         Mockito.when(mockManager.isDiskFull()).thenReturn(true, false);
          assertEquals(0, trackMemoryCheck2.getCount());
          store.checkMemory(trackMemoryCheck2, null);
          assertEquals(1, trackMemoryCheck2.getCount());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact



Reply via email to