This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b3842de5cf Add guardrail for data disk usage
b3842de5cf is described below

commit b3842de5cf1fa1b81872effb4585fbc7e1873d59
Author: Andrés de la Peña <a.penya.gar...@gmail.com>
AuthorDate: Fri Apr 22 16:36:07 2022 +0100

    Add guardrail for data disk usage
    
    patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova and Stefan 
Miklosovic for CASSANDRA-17150
    
    Co-authored-by: Andrés de la Peña <a.penya.gar...@gmail.com>
    Co-authored-by: Zhao Yang <jasonstack.z...@gmail.com>
    Co-authored-by: Eduard Tudenhoefner <etudenhoef...@gmail.com>
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |  28 +
 conf/cassandra.yaml                                |  24 +-
 .../config/CassandraRelevantProperties.java        |   8 +
 src/java/org/apache/cassandra/config/Config.java   |  49 +-
 .../apache/cassandra/config/DataStorageSpec.java   |  13 +-
 .../apache/cassandra/config/GuardrailsOptions.java | 121 +++-
 .../org/apache/cassandra/cql3/QueryOptions.java    |   9 +-
 .../cassandra/cql3/selection/ResultSetBuilder.java |   5 +-
 .../cassandra/cql3/statements/BatchStatement.java  |   8 +-
 .../cql3/statements/ModificationStatement.java     |  25 +
 src/java/org/apache/cassandra/db/Directories.java  |   5 +
 src/java/org/apache/cassandra/db/ReadCommand.java  |   8 +-
 .../apache/cassandra/db/guardrails/Guardrail.java  |  92 ++-
 .../apache/cassandra/db/guardrails/Guardrails.java | 112 +++-
 .../cassandra/db/guardrails/GuardrailsConfig.java  |  25 +-
 .../cassandra/db/guardrails/GuardrailsMBean.java   |  61 +-
 .../db/guardrails/PercentageThreshold.java         |  56 ++
 .../apache/cassandra/db/guardrails/Predicates.java |  93 ++++
 .../apache/cassandra/db/guardrails/Threshold.java  |  20 +-
 .../org/apache/cassandra/gms/ApplicationState.java |   1 +
 .../org/apache/cassandra/gms/VersionedValue.java   |   5 +
 .../cassandra/io/sstable/format/SSTableWriter.java |   2 +-
 .../apache/cassandra/service/StorageService.java   |   2 +
 .../service/disk/usage/DiskUsageBroadcaster.java   | 181 ++++++
 .../service/disk/usage/DiskUsageMonitor.java       | 233 ++++++++
 .../service/disk/usage/DiskUsageState.java         |  70 +++
 .../test/guardrails/GuardrailDiskUsageTest.java    | 225 ++++++++
 .../cassandra/config/DataStorageSpecTest.java      |  29 +-
 .../db/guardrails/GuardrailCollectionSizeTest.java |  10 +-
 .../db/guardrails/GuardrailDiskUsageTest.java      | 617 +++++++++++++++++++++
 .../cassandra/db/guardrails/GuardrailTester.java   |  10 +
 .../cassandra/db/guardrails/GuardrailsTest.java    |  46 ++
 .../cassandra/db/guardrails/ThresholdTester.java   |  28 +-
 .../cassandra/db/virtual/GossipInfoTableTest.java  |   3 +-
 35 files changed, 2125 insertions(+), 100 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index a1213090e2..9e9e1ee2f1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Add guardrail for data disk usage (CASSANDRA-17150)
  * Tool to list data paths of existing tables (CASSANDRA-17568)
  * Migrate track_warnings to more standard naming conventions and use latest 
configuration types rather than long (CASSANDRA-17560)
  * Add support for CONTAINS and CONTAINS KEY in conditional UPDATE and DELETE 
statement (CASSANDRA-10537)
diff --git a/NEWS.txt b/NEWS.txt
index a891eb3a9a..fd31e06c93 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -56,6 +56,34 @@ using the provided 'sstableupgrade' tool.
 
 New features
 ------------
+    - Added a new guardrails framework allowing to define soft/hard limits for 
different user actions, such as limiting
+      the number of tables, columns per table or the size of collections. 
These guardrails are only applied to regular
+      user queries, and superusers and internal queries are excluded. Reaching 
the soft limit raises a client warning,
+      whereas reaching the hard limit aborts the query. In both cases a log 
message and a diagnostic event are emitted.
+      Additionally, some guardrails are not linked to specific user queries 
due to techincal limitations, such as
+      detecting the size of large collections during compaction or 
periodically monitoring the disk usage. These
+      guardrails would only emit the proper logs and diagnostic events when 
triggered, without aborting any processes.
+      Guardrails config is defined through cassandra.yaml properties, and they 
can be dynamically updated through the
+      JMX MBean `org.apache.cassandra.db:type=Guardrails`. There are 
guardrails for:
+        - Number of user keyspaces.
+        - Number of user tables.
+        - Number of columns per table.
+        - Number of secondary indexes per table.
+        - Number of materialized tables per table.
+        - Number of fields per user-defined type.
+        - Number of items in a collection .
+        - Number of partition keys selected by an IN restriction.
+        - Number of partition keys selected by the cartesian product of 
multiple IN restrictions.
+        - Allowed table properties.
+        - Allowed read consistency levels.
+        - Allowed write consistency levels.
+        - Collections size.
+        - Query page size.
+        - Data disk usage, defined either as a percentage or as an absolute 
size.
+        - Whether user-defined timestamps are allowed.
+        - Whether GROUP BY queries are allowed.
+        - Whether the creation of secondary indexes is allowed.
+        - Whether the creation of uncompressed tables is allowed.
     - Add support for the use of pure monotonic functions on the last 
attribute of the GROUP BY clause.
     - Add floor functions that can be use to group by time range.
     - Support for native transport rate limiting via 
native_transport_rate_limiting_enabled and 
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c455d45950..b28f4388f5 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -1653,9 +1653,9 @@ drop_compact_storage_enabled: false
 # of non-frozen collections there could be unaccounted parts of the collection 
on the sstables. This is done this way to
 # prevent read-before-write. The guardrail is also checked at sstable write 
time to detect large non-frozen collections,
 # although in that case exceeding the fail threshold will only log an error 
message, without interrupting the operation.
-# The two thresholds default to 0KiB to disable.
-# collection_size_warn_threshold: 0KiB
-# collection_size_fail_threshold: 0KiB
+# The two thresholds default to null to disable.
+# collection_size_warn_threshold:
+# collection_size_fail_threshold:
 # Guardrail to warn or fail when encountering more elements in collection than 
threshold.
 # At query time this guardrail is applied only to the collection fragment that 
is being writen, even though in the case
 # of non-frozen collections there could be unaccounted parts of the collection 
on the sstables. This is done this way to
@@ -1668,6 +1668,24 @@ drop_compact_storage_enabled: false
 # Default -1 to disable.
 # fields_per_udt_warn_threshold: -1
 # fields_per_udt_fail_threshold: -1
+# Guardrail to warn or fail when local data disk usage percentage exceeds 
threshold. Valid values are in [1, 100].
+# This is only used for the disks storing data directories, so it won't count 
any separate disks used for storing
+# the commitlog, hints nor saved caches. The disk usage is the ratio between 
the amount of space used by the data
+# directories and the addition of that same space and the remaining free space 
on disk. The main purpose of this
+# guardrail is rejecting user writes when the disks are over the defined usage 
percentage, so the writes done by
+# background processes such as compaction and streaming don't fail due to a 
full disk. The limits should be defined
+# accordingly to the expected data growth due to those background processes, 
so for example a compaction strategy
+# doubling the size of the data would require to keep the disk usage under 50%.
+# The two thresholds default to -1 to disable.
+# data_disk_usage_percentage_warn_threshold: -1
+# data_disk_usage_percentage_fail_threshold: -1
+# Allows defining the max disk size of the data directories when calculating 
thresholds for
+# disk_usage_percentage_warn_threshold and 
disk_usage_percentage_fail_threshold, so if this is greater than zero they
+# become percentages of a fixed size on disk instead of percentages of the 
physically available disk size. This should
+# be useful when we have a large disk and we only want to use a part of it for 
Cassandra's data directories.
+# Valid values are in [1, max available disk size of all data directories].
+# Defaults to null to disable and use the physically available disk size of 
data directories during calculations.
+# data_disk_usage_max_disk_size:
 
 # Startup Checks are executed as part of Cassandra startup process, not all of 
them
 # are configurable (so you can disable them) but these which are enumerated 
bellow.
diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 3b63cf8d03..72da3307dc 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.config;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.service.FileSystemOwnershipCheck;
 
@@ -265,6 +267,12 @@ public enum CassandraRelevantProperties
     TEST_IGNORE_SIGAR("cassandra.test.ignore_sigar", "false"),
     PAXOS_EXECUTE_ON_SELF("cassandra.paxos.use_self_execution", "true"),
 
+    /** property for the rate of the scheduled task that monitors disk usage */
+    DISK_USAGE_MONITOR_INTERVAL_MS("cassandra.disk_usage.monitor_interval_ms", 
Long.toString(TimeUnit.SECONDS.toMillis(30))),
+
+    /** property for the interval on which the repeated client warnings and 
diagnostic events about disk usage are ignored */
+    DISK_USAGE_NOTIFY_INTERVAL_MS("cassandra.disk_usage.notify_interval_ms", 
Long.toString(TimeUnit.MINUTES.toMillis(30))),
+
     // for specific tests
     
ORG_APACHE_CASSANDRA_CONF_CASSANDRA_RELEVANT_PROPERTIES_TEST("org.apache.cassandra.conf.CassandraRelevantPropertiesTest"),
     
ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"),
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index 58cfbfddfd..7f6c926d08 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -783,24 +783,22 @@ public class Config
     public volatile SubnetGroups client_error_reporting_exclusions = new 
SubnetGroups();
     public volatile SubnetGroups internode_error_reporting_exclusions = new 
SubnetGroups();
 
-    public static final int DISABLED_GUARDRAIL = -1;
-    public static final DataStorageSpec DISABLED_SIZE_GUARDRAIL = 
DataStorageSpec.inBytes(0);
-    public volatile int keyspaces_warn_threshold = DISABLED_GUARDRAIL;
-    public volatile int keyspaces_fail_threshold = DISABLED_GUARDRAIL;
-    public volatile int tables_warn_threshold = DISABLED_GUARDRAIL;
-    public volatile int tables_fail_threshold = DISABLED_GUARDRAIL;
-    public volatile int columns_per_table_warn_threshold = DISABLED_GUARDRAIL;
-    public volatile int columns_per_table_fail_threshold = DISABLED_GUARDRAIL;
-    public volatile int secondary_indexes_per_table_warn_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int secondary_indexes_per_table_fail_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int materialized_views_per_table_warn_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int materialized_views_per_table_fail_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int page_size_warn_threshold = DISABLED_GUARDRAIL;
-    public volatile int page_size_fail_threshold = DISABLED_GUARDRAIL;
-    public volatile int partition_keys_in_select_warn_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int partition_keys_in_select_fail_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int in_select_cartesian_product_warn_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int in_select_cartesian_product_fail_threshold = 
DISABLED_GUARDRAIL;
+    public volatile int keyspaces_warn_threshold = -1;
+    public volatile int keyspaces_fail_threshold = -1;
+    public volatile int tables_warn_threshold = -1;
+    public volatile int tables_fail_threshold = -1;
+    public volatile int columns_per_table_warn_threshold = -1;
+    public volatile int columns_per_table_fail_threshold = -1;
+    public volatile int secondary_indexes_per_table_warn_threshold = -1;
+    public volatile int secondary_indexes_per_table_fail_threshold = -1;
+    public volatile int materialized_views_per_table_warn_threshold = -1;
+    public volatile int materialized_views_per_table_fail_threshold = -1;
+    public volatile int page_size_warn_threshold = -1;
+    public volatile int page_size_fail_threshold = -1;
+    public volatile int partition_keys_in_select_warn_threshold = -1;
+    public volatile int partition_keys_in_select_fail_threshold = -1;
+    public volatile int in_select_cartesian_product_warn_threshold = -1;
+    public volatile int in_select_cartesian_product_fail_threshold = -1;
     public volatile Set<String> table_properties_warned = 
Collections.emptySet();
     public volatile Set<String> table_properties_ignored = 
Collections.emptySet();
     public volatile Set<String> table_properties_disallowed = 
Collections.emptySet();
@@ -814,12 +812,15 @@ public class Config
     public volatile boolean uncompressed_tables_enabled = true;
     public volatile boolean compact_tables_enabled = true;
     public volatile boolean read_before_write_list_operations_enabled = true;
-    public volatile DataStorageSpec collection_size_warn_threshold = 
DISABLED_SIZE_GUARDRAIL;
-    public volatile DataStorageSpec collection_size_fail_threshold = 
DISABLED_SIZE_GUARDRAIL;
-    public volatile int items_per_collection_warn_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int items_per_collection_fail_threshold = 
DISABLED_GUARDRAIL;
-    public volatile int fields_per_udt_warn_threshold = DISABLED_GUARDRAIL;
-    public volatile int fields_per_udt_fail_threshold = DISABLED_GUARDRAIL;
+    public volatile DataStorageSpec collection_size_warn_threshold = null;
+    public volatile DataStorageSpec collection_size_fail_threshold = null;
+    public volatile int items_per_collection_warn_threshold = -1;
+    public volatile int items_per_collection_fail_threshold = -1;
+    public volatile int fields_per_udt_warn_threshold = -1;
+    public volatile int fields_per_udt_fail_threshold = -1;
+    public volatile int data_disk_usage_percentage_warn_threshold = -1;
+    public volatile int data_disk_usage_percentage_fail_threshold = -1;
+    public volatile DataStorageSpec data_disk_usage_max_disk_size = null;
 
     public volatile DurationSpec streaming_state_expires = 
DurationSpec.inDays(3);
     public volatile DataStorageSpec streaming_state_size = 
DataStorageSpec.inMebibytes(40);
diff --git a/src/java/org/apache/cassandra/config/DataStorageSpec.java 
b/src/java/org/apache/cassandra/config/DataStorageSpec.java
index eeafe2ed8d..93224b3245 100644
--- a/src/java/org/apache/cassandra/config/DataStorageSpec.java
+++ b/src/java/org/apache/cassandra/config/DataStorageSpec.java
@@ -134,6 +134,17 @@ public class DataStorageSpec
         return new DataStorageSpec(mebibytes, MEBIBYTES);
     }
 
+    /**
+     * Creates a {@code DataStorageSpec} of the specified amount of gibibytes.
+     *
+     * @param gibibytes the amount of gibibytes
+     * @return a {@code DataStorageSpec}
+     */
+    public static DataStorageSpec inGibibytes(long gibibytes)
+    {
+        return new DataStorageSpec(gibibytes, GIBIBYTES);
+    }
+
     /**
      * @return the data storage unit.
      */
@@ -421,4 +432,4 @@ public class DataStorageSpec
             throw new AbstractMethodError();
         }
     }
-}
\ No newline at end of file
+}
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java 
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index b00075ab21..b73cc195bf 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -18,12 +18,16 @@
 
 package org.apache.cassandra.config;
 
+import java.math.BigInteger;
 import java.util.Collections;
 import java.util.Set;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
+import javax.annotation.Nullable;
+
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +35,7 @@ import 
org.apache.cassandra.cql3.statements.schema.TableAttributes;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
 
 import static java.lang.String.format;
 import static java.util.stream.Collectors.toSet;
@@ -74,9 +79,11 @@ public class GuardrailsOptions implements GuardrailsConfig
         config.read_consistency_levels_disallowed = 
validateConsistencyLevels(config.read_consistency_levels_disallowed, 
"read_consistency_levels_disallowed");
         config.write_consistency_levels_warned = 
validateConsistencyLevels(config.write_consistency_levels_warned, 
"write_consistency_levels_warned");
         config.write_consistency_levels_disallowed = 
validateConsistencyLevels(config.write_consistency_levels_disallowed, 
"write_consistency_levels_disallowed");
-        validateSizeThreshold(config.collection_size_warn_threshold, 
config.collection_size_fail_threshold, "collection_size");
+        validateSizeThreshold(config.collection_size_warn_threshold, 
config.collection_size_fail_threshold, false, "collection_size");
         validateIntThreshold(config.items_per_collection_warn_threshold, 
config.items_per_collection_fail_threshold, "items_per_collection");
         validateIntThreshold(config.fields_per_udt_warn_threshold, 
config.fields_per_udt_fail_threshold, "fields_per_udt");
+        
validatePercentageThreshold(config.data_disk_usage_percentage_warn_threshold, 
config.data_disk_usage_percentage_fail_threshold, "data_disk_usage_percentage");
+        validateDataDiskUsageMaxDiskSize(config.data_disk_usage_max_disk_size);
     }
 
     @Override
@@ -460,20 +467,23 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> 
config.write_consistency_levels_disallowed = x);
     }
 
+    @Override
+    @Nullable
     public DataStorageSpec getCollectionSizeWarnThreshold()
     {
         return config.collection_size_warn_threshold;
     }
 
     @Override
+    @Nullable
     public DataStorageSpec getCollectionSizeFailThreshold()
     {
         return config.collection_size_fail_threshold;
     }
 
-    public void setCollectionSizeThreshold(DataStorageSpec warn, 
DataStorageSpec fail)
+    public void setCollectionSizeThreshold(@Nullable DataStorageSpec warn, 
@Nullable DataStorageSpec fail)
     {
-        validateSizeThreshold(warn, fail, "collection_size");
+        validateSizeThreshold(warn, fail, false, "collection_size");
         updatePropertyWithLogging("collection_size_warn_threshold",
                                   warn,
                                   () -> config.collection_size_warn_threshold,
@@ -534,10 +544,49 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> config.fields_per_udt_fail_threshold = 
x);
     }
 
+    public int getDataDiskUsagePercentageWarnThreshold()
+    {
+        return config.data_disk_usage_percentage_warn_threshold;
+    }
+
+    @Override
+    public int getDataDiskUsagePercentageFailThreshold()
+    {
+        return config.data_disk_usage_percentage_fail_threshold;
+    }
+
+    public void setDataDiskUsagePercentageThreshold(int warn, int fail)
+    {
+        validatePercentageThreshold(warn, fail, "data_disk_usage_percentage");
+        updatePropertyWithLogging("data_disk_usage_percentage_warn_threshold",
+                                  warn,
+                                  () -> 
config.data_disk_usage_percentage_warn_threshold,
+                                  x -> 
config.data_disk_usage_percentage_warn_threshold = x);
+        updatePropertyWithLogging("data_disk_usage_percentage_fail_threshold",
+                                  fail,
+                                  () -> 
config.data_disk_usage_percentage_fail_threshold,
+                                  x -> 
config.data_disk_usage_percentage_fail_threshold = x);
+    }
+
+    @Override
+    public DataStorageSpec getDataDiskUsageMaxDiskSize()
+    {
+        return config.data_disk_usage_max_disk_size;
+    }
+
+    public void setDataDiskUsageMaxDiskSize(@Nullable DataStorageSpec diskSize)
+    {
+        validateDataDiskUsageMaxDiskSize(diskSize);
+        updatePropertyWithLogging("data_disk_usage_max_disk_size",
+                                  diskSize,
+                                  () -> config.data_disk_usage_max_disk_size,
+                                  x -> config.data_disk_usage_max_disk_size = 
x);
+    }
+
     private static <T> void updatePropertyWithLogging(String propertyName, T 
newValue, Supplier<T> getter, Consumer<T> setter)
     {
         T oldValue = getter.get();
-        if (!newValue.equals(oldValue))
+        if (newValue == null || !newValue.equals(oldValue))
         {
             setter.accept(newValue);
             logger.info("Updated {} from {} to {}", propertyName, oldValue, 
newValue);
@@ -546,7 +595,7 @@ public class GuardrailsOptions implements GuardrailsConfig
 
     private static void validatePositiveNumeric(long value, long maxValue, 
String name)
     {
-        if (value == Config.DISABLED_GUARDRAIL)
+        if (value == -1)
             return;
 
         if (value > maxValue)
@@ -555,14 +604,17 @@ public class GuardrailsOptions implements GuardrailsConfig
 
         if (value == 0)
             throw new IllegalArgumentException(format("Invalid value for %s: 0 
is not allowed; " +
-                                                      "if attempting to 
disable use %d",
-                                                      name, 
Config.DISABLED_GUARDRAIL));
+                                                      "if attempting to 
disable use -1", name));
 
         // We allow -1 as a general "disabling" flag. But reject anything 
lower to avoid mistakes.
         if (value <= 0)
             throw new IllegalArgumentException(format("Invalid value %d for 
%s: negative values are not allowed, " +
-                                                      "outside of %d which 
disables the guardrail",
-                                                      value, name, 
Config.DISABLED_GUARDRAIL));
+                                                      "outside of -1 which 
disables the guardrail", value, name));
+    }
+
+    private static void validatePercentage(long value, String name)
+    {
+        validatePositiveNumeric(value, 100, name);
     }
 
     private static void validateIntThreshold(int warn, int fail, String name)
@@ -572,9 +624,16 @@ public class GuardrailsOptions implements GuardrailsConfig
         validateWarnLowerThanFail(warn, fail, name);
     }
 
+    private static void validatePercentageThreshold(int warn, int fail, String 
name)
+    {
+        validatePercentage(warn, name + "_warn_threshold");
+        validatePercentage(fail, name + "_fail_threshold");
+        validateWarnLowerThanFail(warn, fail, name);
+    }
+
     private static void validateWarnLowerThanFail(long warn, long fail, String 
name)
     {
-        if (warn == Config.DISABLED_GUARDRAIL || fail == 
Config.DISABLED_GUARDRAIL)
+        if (warn == -1 || fail == -1)
             return;
 
         if (fail < warn)
@@ -582,9 +641,27 @@ public class GuardrailsOptions implements GuardrailsConfig
                                                       "than the fail threshold 
%d", warn, name, fail));
     }
 
-    private static void validateSizeThreshold(DataStorageSpec warn, 
DataStorageSpec fail, String name)
+    private static void validateSize(DataStorageSpec size, boolean allowZero, 
String name)
+    {
+        if (size == null)
+            return;
+
+        if (!allowZero && size.toBytes() == 0)
+            throw new IllegalArgumentException(format("Invalid value for %s: 0 
is not allowed; " +
+                                                      "if attempting to 
disable use an empty value",
+                                                      name));
+    }
+
+    private static void validateSizeThreshold(DataStorageSpec warn, 
DataStorageSpec fail, boolean allowZero, String name)
     {
-        if (warn.equals(Config.DISABLED_SIZE_GUARDRAIL) || 
fail.equals(Config.DISABLED_SIZE_GUARDRAIL))
+        validateSize(warn, allowZero, name + "_warn_threshold");
+        validateSize(fail, allowZero, name + "_fail_threshold");
+        validateWarnLowerThanFail(warn, fail, name);
+    }
+
+    private static void validateWarnLowerThanFail(DataStorageSpec warn, 
DataStorageSpec fail, String name)
+    {
+        if (warn == null || fail == null)
             return;
 
         if (fail.toBytes() < warn.toBytes())
@@ -615,4 +692,24 @@ public class GuardrailsOptions implements GuardrailsConfig
 
         return consistencyLevels.isEmpty() ? Collections.emptySet() : 
Sets.immutableEnumSet(consistencyLevels);
     }
+
+    private static void validateDataDiskUsageMaxDiskSize(DataStorageSpec 
maxDiskSize)
+    {
+        if (maxDiskSize == null)
+            return;
+
+        validateSize(maxDiskSize, false, "data_disk_usage_max_disk_size");
+
+        BigInteger diskSize = 
DiskUsageMonitor.dataDirectoriesGroupedByFileStore()
+                                              .keys()
+                                              .stream()
+                                              
.map(DiskUsageMonitor::totalSpace)
+                                              .map(BigInteger::valueOf)
+                                              .reduce(BigInteger.ZERO, 
BigInteger::add);
+
+        if (diskSize.compareTo(BigInteger.valueOf(maxDiskSize.toBytes())) < 0)
+            throw new IllegalArgumentException(format("Invalid value for 
data_disk_usage_max_disk_size: " +
+                                                      "%s specified, but only 
%s are actually available on disk",
+                                                      maxDiskSize, 
DataStorageSpec.inBytes(diskSize.longValue())));
+    }
 }
diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java 
b/src/java/org/apache/cassandra/cql3/QueryOptions.java
index 4193c4e15a..5fcaf06786 100644
--- a/src/java/org/apache/cassandra/cql3/QueryOptions.java
+++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java
@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList;
 
 import io.netty.buffer.ByteBuf;
 
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.schema.ColumnMetadata;
@@ -282,13 +281,13 @@ public abstract class QueryOptions
         @Override
         public long getCoordinatorReadSizeWarnThresholdBytes()
         {
-            return Config.DISABLED_GUARDRAIL;
+            return -1;
         }
 
         @Override
         public long getCoordinatorReadSizeFailThresholdBytes()
         {
-            return Config.DISABLED_GUARDRAIL;
+            return -1;
         }
     }
 
@@ -299,8 +298,8 @@ public abstract class QueryOptions
 
         public DefaultReadThresholds(DataStorageSpec warnThreshold, 
DataStorageSpec abortThreshold)
         {
-            this.warnThresholdBytes = warnThreshold == null ? 
Config.DISABLED_GUARDRAIL : warnThreshold.toBytes();
-            this.abortThresholdBytes = abortThreshold == null ? 
Config.DISABLED_GUARDRAIL : abortThreshold.toBytes();
+            this.warnThresholdBytes = warnThreshold == null ? -1 : 
warnThreshold.toBytes();
+            this.abortThresholdBytes = abortThreshold == null ? -1 : 
abortThreshold.toBytes();
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java 
b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
index b42357835a..22566b26d7 100644
--- a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
+++ b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java
@@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.cassandra.config.Config;
 import org.apache.cassandra.cql3.ResultSet;
 import org.apache.cassandra.cql3.ResultSet.ResultMetadata;
 import org.apache.cassandra.cql3.selection.Selection.Selectors;
@@ -76,7 +75,7 @@ public final class ResultSetBuilder
 
     public boolean shouldWarn(long thresholdBytes)
     {
-        if (thresholdBytes != Config.DISABLED_GUARDRAIL &&!sizeWarningEmitted 
&& size > thresholdBytes)
+        if (thresholdBytes != -1 &&!sizeWarningEmitted && size > 
thresholdBytes)
         {
             sizeWarningEmitted = true;
             return true;
@@ -86,7 +85,7 @@ public final class ResultSetBuilder
 
     public boolean shouldReject(long thresholdBytes)
     {
-        return thresholdBytes != Config.DISABLED_GUARDRAIL && size > 
thresholdBytes;
+        return thresholdBytes != -1 && size > thresholdBytes;
     }
 
     public long getSize()
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 89ece02c4d..61e4934864 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -411,8 +411,12 @@ public class BatchStatement implements CQLStatement
         if (options.getSerialConsistency() == null)
             throw new InvalidRequestException("Invalid empty serial 
consistency level");
 
+        ClientState clientState = queryState.getClientState();
         
Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(), 
options.getSerialConsistency()),
-                                                queryState.getClientState());
+                                                clientState);
+
+        for (int i = 0; i < statements.size(); i++ )
+            statements.get(i).validateDiskUsage(options.forStatement(i), 
clientState);
 
         if (hasConditions)
             return executeWithConditions(options, queryState, 
queryStartNanoTime);
@@ -420,7 +424,7 @@ public class BatchStatement implements CQLStatement
         if (updatesVirtualTables)
             executeInternalWithoutCondition(queryState, options, 
queryStartNanoTime);
         else    
-            executeWithoutConditions(getMutations(queryState.getClientState(), 
options, false, timestamp, nowInSeconds, queryStartNanoTime),
+            executeWithoutConditions(getMutations(clientState, options, false, 
timestamp, nowInSeconds, queryStartNanoTime),
                                      options.getConsistency(), 
queryStartNanoTime);
 
         return new ResultMessage.Void();
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 354f739bb1..ab36ec971b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.auth.Permission;
 import org.apache.cassandra.db.guardrails.Guardrails;
 import org.apache.cassandra.db.marshal.ValueAccessor;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableMetadata;
@@ -51,6 +54,7 @@ import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
 import org.apache.cassandra.service.paxos.Ballot;
 import org.apache.cassandra.service.paxos.BallotGenerator;
 import org.apache.cassandra.service.paxos.Commit.Proposal;
@@ -277,6 +281,25 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
             Guardrails.userTimestampsEnabled.ensureEnabled(state);
     }
 
+    public void validateDiskUsage(QueryOptions options, ClientState state)
+    {
+        // reject writes if any replica exceeds disk usage failure limit or 
warn if it exceeds warn limit
+        if (Guardrails.replicaDiskUsage.enabled(state) && 
DiskUsageBroadcaster.instance.hasStuffedOrFullNode())
+        {
+            Keyspace keyspace = Keyspace.open(keyspace());
+
+            for (ByteBuffer key : buildPartitionKeyNames(options, state))
+            {
+                Token token = metadata().partitioner.getToken(key);
+
+                for (Replica replica : 
ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token).all())
+                {
+                    Guardrails.replicaDiskUsage.guard(replica.endpoint(), 
state);
+                }
+            }
+        }
+    }
+
     public RegularAndStaticColumns updatedColumns()
     {
         return updatedColumns;
@@ -480,6 +503,8 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
         else
             cl.validateForWrite();
 
+        validateDiskUsage(options, queryState.getClientState());
+
         List<? extends IMutation> mutations =
             getMutations(queryState.getClientState(),
                          options,
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index bcef41bc7d..972ba6d3cd 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -640,6 +640,11 @@ public class Directories
             return availableSpace > 0 ? availableSpace : 0;
         }
 
+        public long getRawSize()
+        {
+            return FileUtils.folderSize(location);
+        }
+
         @Override
         public boolean equals(Object o)
         {
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 1efb086b40..ef70588a45 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -666,8 +666,8 @@ public abstract class ReadCommand extends AbstractReadQuery
         DataStorageSpec failThreshold = 
DatabaseDescriptor.getLocalReadSizeFailThreshold();
         if (!shouldTrackSize(warnThreshold, failThreshold))
             return iterator;
-        final long warnBytes = warnThreshold == null ? 
Config.DISABLED_GUARDRAIL : warnThreshold.toBytes();
-        final long failBytes = failThreshold == null ? 
Config.DISABLED_GUARDRAIL : failThreshold.toBytes();
+        final long warnBytes = warnThreshold == null ? -1 : 
warnThreshold.toBytes();
+        final long failBytes = failThreshold == null ? -1 : 
failThreshold.toBytes();
         class QuerySizeTracking extends Transformation<UnfilteredRowIterator>
         {
             private long sizeInBytes = 0;
@@ -709,7 +709,7 @@ public abstract class ReadCommand extends AbstractReadQuery
             private void addSize(long size)
             {
                 this.sizeInBytes += size;
-                if (failBytes != Config.DISABLED_GUARDRAIL && this.sizeInBytes 
>= failBytes)
+                if (failBytes != -1 && this.sizeInBytes >= failBytes)
                 {
                     String msg = String.format("Query %s attempted to read %d 
bytes but max allowed is %s; query aborted  (see 
local_read_size_fail_threshold)",
                                                ReadCommand.this.toCQLString(), 
this.sizeInBytes, failThreshold);
@@ -718,7 +718,7 @@ public abstract class ReadCommand extends AbstractReadQuery
                     MessageParams.add(ParamType.LOCAL_READ_SIZE_FAIL, 
this.sizeInBytes);
                     throw new LocalReadSizeTooLargeException(msg);
                 }
-                else if (warnBytes != Config.DISABLED_GUARDRAIL && 
this.sizeInBytes >= warnBytes)
+                else if (warnBytes != -1 && this.sizeInBytes >= warnBytes)
                 {
                     MessageParams.add(ParamType.LOCAL_READ_SIZE_WARN, 
this.sizeInBytes);
                 }
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
index 6609760109..c058f10784 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java
@@ -29,6 +29,7 @@ import 
org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 /**
@@ -50,11 +51,32 @@ public abstract class Guardrail
     /** A name identifying the guardrail (mainly for shipping with diagnostic 
events). */
     public final String name;
 
+    /** Minimum logging and triggering interval to avoid spamming downstream. 
*/
+    private long minNotifyIntervalInMs = 0;
+
+    /** Time of last warning in milliseconds. */
+    private volatile long lastWarnInMs = 0;
+
+    /** Time of last failure in milliseconds. */
+    private volatile long lastFailInMs = 0;
+
     Guardrail(String name)
     {
         this.name = name;
     }
 
+    /**
+     * Checks whether this guardrail is enabled or not when the check is done 
for a background opperation that is not
+     * associated to a specific {@link ClientState}, such as compaction or 
other background processes. Operations that
+     * are associated to a {@link ClientState}, such as CQL queries, should 
use {@link Guardrail#enabled(ClientState)}.
+     *
+     * @return {@code true} if this guardrail is enabled, {@code false} 
otherwise.
+     */
+    public boolean enabled()
+    {
+        return enabled(null);
+    }
+
     /**
      * Checks whether this guardrail is enabled or not. This will be enabled 
if the database is initialized and the
      * authenticated user (if specified) is not system nor superuser.
@@ -75,6 +97,9 @@ public abstract class Guardrail
 
     protected void warn(String message, String redactedMessage)
     {
+        if (skipNotifying(true))
+            return;
+
         message = decorateMessage(message);
 
         logger.warn(message);
@@ -95,13 +120,16 @@ public abstract class Guardrail
     {
         message = decorateMessage(message);
 
-        logger.error(message);
-        // Note that ClientWarn will simply ignore the message if we're not 
running this as part of a user query
-        // (the internal "state" will be null)
-        ClientWarn.instance.warn(message);
-        // Similarly, tracing will also ignore the message if we're not 
running tracing on the current thread.
-        Tracing.trace(message);
-        GuardrailsDiagnostics.failed(name, decorateMessage(redactedMessage));
+        if (!skipNotifying(false))
+        {
+            logger.error(message);
+            // Note that ClientWarn will simply ignore the message if we're 
not running this as part of a user query
+            // (the internal "state" will be null)
+            ClientWarn.instance.warn(message);
+            // Similarly, tracing will also ignore the message if we're not 
running tracing on the current thread.
+            Tracing.trace(message);
+            GuardrailsDiagnostics.failed(name, 
decorateMessage(redactedMessage));
+        }
 
         if (state != null)
             throw new GuardrailViolatedException(message);
@@ -113,4 +141,54 @@ public abstract class Guardrail
         // Add a prefix to error message so user knows what threw the warning 
or cause the failure
         return String.format("Guardrail %s violated: %s", name, message);
     }
+
+    /**
+     * Note: this method is not thread safe and should only be used during 
guardrail initialization
+     *
+     * @param minNotifyIntervalInMs frequency of logging and triggering 
listener to avoid spamming,
+     *                              default 0 means always log and trigger 
listeners.
+     * @return current guardrail
+     */
+    Guardrail minNotifyIntervalInMs(long minNotifyIntervalInMs)
+    {
+        assert minNotifyIntervalInMs >= 0;
+        this.minNotifyIntervalInMs = minNotifyIntervalInMs;
+        return this;
+    }
+
+    /**
+     * reset last notify time to make sure it will notify downstream when 
{@link this#warn(String, String)}
+     * or {@link this#fail(String, ClientState)} is called next time.
+     */
+    @VisibleForTesting
+    void resetLastNotifyTime()
+    {
+        lastFailInMs = 0;
+        lastWarnInMs = 0;
+    }
+
+    /**
+     * @return true if guardrail should not log message and trigger listeners; 
otherwise, update lastWarnInMs or
+     * lastFailInMs respectively.
+     */
+    private boolean skipNotifying(boolean isWarn)
+    {
+        if (minNotifyIntervalInMs == 0)
+            return false;
+
+        long nowInMs = Clock.Global.currentTimeMillis();
+        long timeElapsedInMs = nowInMs - (isWarn ? lastWarnInMs : 
lastFailInMs);
+
+        boolean skip = timeElapsedInMs < minNotifyIntervalInMs;
+
+        if (!skip)
+        {
+            if (isWarn)
+                lastWarnInMs = nowInMs;
+            else
+                lastFailInMs = nowInMs;
+        }
+
+        return skip;
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index cc54b279a4..d7ec81e83b 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -22,15 +22,19 @@ import java.util.Collections;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.GuardrailsOptions;
 import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
 import org.apache.cassandra.utils.MBeanWrapper;
 
 import static java.lang.String.format;
@@ -42,7 +46,7 @@ public final class Guardrails implements GuardrailsMBean
 {
     public static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=Guardrails";
 
-    private static final GuardrailsConfigProvider CONFIG_PROVIDER = 
GuardrailsConfigProvider.instance;
+    public static final GuardrailsConfigProvider CONFIG_PROVIDER = 
GuardrailsConfigProvider.instance;
     private static final GuardrailsOptions DEFAULT_CONFIG = 
DatabaseDescriptor.getGuardrailsConfig();
 
     @VisibleForTesting
@@ -201,11 +205,11 @@ public final class Guardrails implements GuardrailsMBean
                   state -> 
CONFIG_PROVIDER.getOrCreate(state).getInSelectCartesianProductWarnThreshold(),
                   state -> 
CONFIG_PROVIDER.getOrCreate(state).getInSelectCartesianProductFailThreshold(),
                   (isWarning, what, value, threshold) ->
-                  isWarning ? format("The cartesian product of the IN 
restrictions on %s produces %d values, " +
+                  isWarning ? format("The cartesian product of the IN 
restrictions on %s produces %s values, " +
                                      "this exceeds warning threshold of %s.",
                                      what, value, threshold)
                             : format("Aborting query because the cartesian 
product of the IN restrictions on %s " +
-                                     "produces %d values, this exceeds fail 
threshold of %s.",
+                                     "produces %s values, this exceeds fail 
threshold of %s.",
                                      what, value, threshold));
 
     /**
@@ -233,8 +237,8 @@ public final class Guardrails implements GuardrailsMBean
      */
     public static final Threshold collectionSize =
     new Threshold("collection_size",
-                  state -> 
CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold().toBytes(),
-                  state -> 
CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold().toBytes(),
+                  state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold()),
+                  state -> 
sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold()),
                   (isWarning, what, value, threshold) ->
                   isWarning ? format("Detected collection %s of size %s, this 
exceeds the warning threshold of %s.",
                                      what, value, threshold)
@@ -267,6 +271,42 @@ public final class Guardrails implements GuardrailsMBean
                             : format("User types cannot have more than %s 
columns, but %s provided for user type %s.",
                                      threshold, value, what));
 
+    /**
+     * Guardrail on the data disk usage on the local node, used by a periodic 
task to calculate and propagate that status.
+     * See {@link org.apache.cassandra.service.disk.usage.DiskUsageMonitor} 
and {@link DiskUsageBroadcaster}.
+     */
+    public static final PercentageThreshold localDataDiskUsage =
+    new PercentageThreshold("local_data_disk_usage",
+                            state -> 
CONFIG_PROVIDER.getOrCreate(state).getDataDiskUsagePercentageWarnThreshold(),
+                            state -> 
CONFIG_PROVIDER.getOrCreate(state).getDataDiskUsagePercentageFailThreshold(),
+                            (isWarning, what, value, threshold) ->
+                            isWarning ? format("Local data disk usage %s(%s) 
exceeds warning threshold of %s",
+                                               value, what, threshold)
+                                      : format("Local data disk usage %s(%s) 
exceeds failure threshold of %s, " +
+                                               "will stop accepting writes",
+                                               value, what, threshold));
+
+    /**
+     * Guardrail on the data disk usage on replicas, used at write time to 
verify the status of the involved replicas.
+     * See {@link org.apache.cassandra.service.disk.usage.DiskUsageMonitor} 
and {@link DiskUsageBroadcaster}.
+     */
+    public static final Predicates<InetAddressAndPort> replicaDiskUsage =
+    new Predicates<>("replica_disk_usage",
+                     state -> DiskUsageBroadcaster.instance::isStuffed,
+                     state -> DiskUsageBroadcaster.instance::isFull,
+                     // not using `value` because it represents replica 
address which should be hidden from client.
+                     (isWarning, value) ->
+                     isWarning ? "Replica disk usage exceeds warning threshold"
+                               : "Write request failed because disk usage 
exceeds failure threshold");
+
+    static
+    {
+        // Avoid spamming with notifications about stuffed/full disks
+        long minNotifyInterval = 
CassandraRelevantProperties.DISK_USAGE_NOTIFY_INTERVAL_MS.getLong();
+        localDataDiskUsage.minNotifyIntervalInMs(minNotifyInterval);
+        replicaDiskUsage.minNotifyIntervalInMs(minNotifyInterval);
+    }
+
     private Guardrails()
     {
         MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
@@ -557,22 +597,24 @@ public final class Guardrails implements GuardrailsMBean
         DEFAULT_CONFIG.setPartitionKeysInSelectThreshold(warn, fail);
     }
 
-    public long getCollectionSizeWarnThresholdInKiB()
+    @Override
+    @Nullable
+    public String getCollectionSizeWarnThreshold()
     {
-        return DEFAULT_CONFIG.getCollectionSizeWarnThreshold().toKibibytes();
+        return sizeToString(DEFAULT_CONFIG.getCollectionSizeWarnThreshold());
     }
 
     @Override
-    public long getCollectionSizeFailThresholdInKiB()
+    @Nullable
+    public String getCollectionSizeFailThreshold()
     {
-        return DEFAULT_CONFIG.getCollectionSizeFailThreshold().toKibibytes();
+        return sizeToString(DEFAULT_CONFIG.getCollectionSizeFailThreshold());
     }
 
     @Override
-    public void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB)
+    public void setCollectionSizeThreshold(@Nullable String warnSize, 
@Nullable String failSize)
     {
-        
DEFAULT_CONFIG.setCollectionSizeThreshold(DataStorageSpec.inKibibytes(warnInKiB),
-                                                  
DataStorageSpec.inKibibytes(failInKiB));
+        DEFAULT_CONFIG.setCollectionSizeThreshold(sizeFromString(warnSize), 
sizeFromString(failSize));
     }
 
     @Override
@@ -725,6 +767,37 @@ public final class Guardrails implements GuardrailsMBean
         DEFAULT_CONFIG.setFieldsPerUDTThreshold(warn, fail);
     }
 
+    @Override
+    public int getDataDiskUsagePercentageWarnThreshold()
+    {
+        return DEFAULT_CONFIG.getDataDiskUsagePercentageWarnThreshold();
+    }
+
+    @Override
+    public int getDataDiskUsagePercentageFailThreshold()
+    {
+        return DEFAULT_CONFIG.getDataDiskUsagePercentageFailThreshold();
+    }
+
+    @Override
+    public void setDataDiskUsagePercentageThreshold(int warn, int fail)
+    {
+        DEFAULT_CONFIG.setDataDiskUsagePercentageThreshold(warn, fail);
+    }
+
+    @Override
+    @Nullable
+    public String getDataDiskUsageMaxDiskSize()
+    {
+        return sizeToString(DEFAULT_CONFIG.getDataDiskUsageMaxDiskSize());
+    }
+
+    @Override
+    public void setDataDiskUsageMaxDiskSize(@Nullable String size)
+    {
+        DEFAULT_CONFIG.setDataDiskUsageMaxDiskSize(sizeFromString(size));
+    }
+
     private static String toCSV(Set<String> values)
     {
         return values == null || values.isEmpty() ? "" : String.join(",", 
values);
@@ -758,4 +831,19 @@ public final class Guardrails implements GuardrailsMBean
             return null;
         return 
set.stream().map(ConsistencyLevel::valueOf).collect(Collectors.toSet());
     }
+
+    private static Long sizeToBytes(@Nullable DataStorageSpec size)
+    {
+        return size == null ? -1 : size.toBytes();
+    }
+
+    private static String sizeToString(@Nullable DataStorageSpec size)
+    {
+        return size == null ? null : size.toString();
+    }
+
+    private static DataStorageSpec sizeFromString(@Nullable String size)
+    {
+        return StringUtils.isEmpty(size) ? null : new DataStorageSpec(size);
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index 08b3d5677b..562509b7dd 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -20,6 +20,8 @@ package org.apache.cassandra.db.guardrails;
 
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
 import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.db.ConsistencyLevel;
 
@@ -200,14 +202,16 @@ public interface GuardrailsConfig
      */
     Set<ConsistencyLevel> getWriteConsistencyLevelsDisallowed();
 
-    /*
+    /**
      * @return The threshold to warn when encountering a collection with 
larger data size than threshold.
      */
+    @Nullable
     DataStorageSpec getCollectionSizeWarnThreshold();
 
     /**
      * @return The threshold to prevent collections with larger data size than 
threshold.
      */
+    @Nullable
     DataStorageSpec getCollectionSizeFailThreshold();
 
     /**
@@ -229,4 +233,23 @@ public interface GuardrailsConfig
      * @return The threshold to fail when creating a UDT with more fields than 
threshold.
      */
     int getFieldsPerUDTFailThreshold();
+
+    /**
+     * @return The threshold to warn when local disk usage percentage exceeds 
that threshold.
+     * Allowed values are in the range {@code [1, 100]}, and -1 means disabled.
+     */
+    int getDataDiskUsagePercentageWarnThreshold();
+
+    /**
+     * @return The threshold to fail when local disk usage percentage exceeds 
that threshold.
+     * Allowed values are in the range {@code [1, 100]}, and -1 means disabled.
+     */
+    int getDataDiskUsagePercentageFailThreshold();
+
+    /**
+     * @return The max disk size of the data directories when calculating disk 
usage thresholds, {@code null} means
+     * disabled.
+     */
+    @Nullable
+    DataStorageSpec getDataDiskUsageMaxDiskSize();
 }
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index 771db67ce6..215b953b89 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.db.guardrails;
 
 import java.util.Set;
+import javax.annotation.Nullable;
 
 /**
  * JMX entrypoint for updating the default guardrails configuration parsed 
from {@code cassandra.yaml}.
@@ -110,6 +111,7 @@ public interface GuardrailsMBean
 
     /**
      * Enables or disables the ability to create secondary indexes
+     *
      * @param enabled
      */
     void setSecondaryIndexesEnabled(boolean enabled);
@@ -401,20 +403,30 @@ public interface GuardrailsMBean
     void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels);
 
     /**
-     * @return The threshold to warn when encountering larger size of 
collection data than threshold, in KiB.
+     * @return The threshold to warn when encountering larger size of 
collection data than threshold, as a string
+     * formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 
30KiB} or {@code 40B}.  A {@code null} value
+     * means that the threshold is disabled.
      */
-    long getCollectionSizeWarnThresholdInKiB();
+    @Nullable
+    String getCollectionSizeWarnThreshold();
 
     /**
-     * @return The threshold to prevent collections with larger data size than 
threshold, in KiB.
+     * @return The threshold to prevent collections with larger data size than 
threshold, as a string formatted as in,
+     * for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 
40B}. A {@code null} value means that the
+     * threshold is disabled.
      */
-    long getCollectionSizeFailThresholdInKiB();
+    @Nullable
+    String getCollectionSizeFailThreshold();
 
     /**
-     * @param warnInKiB The threshold to warn when encountering larger size of 
collection data than threshold, in KiB.
-     * @param failInKiB The threshold to prevent collections with larger data 
size than threshold, in KiB.
+     * @param warnSize The threshold to warn when encountering larger size of 
collection data than threshold, as a
+     *                 string formatted as in, for example, {@code 10GiB}, 
{@code 20MiB}, {@code 30KiB} or {@code 40B}.
+     *                 A {@code null} value means disabled.
+     * @param failSize The threshold to prevent collections with larger data 
size than threshold, as a string formatted
+     *                 as in, for example, {@code 10GiB}, {@code 20MiB}, 
{@code 30KiB} or {@code 40B}. A {@code null}
+     *                 value means disabled.
      */
-    void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB);
+    void setCollectionSizeThreshold(@Nullable String warnSize, @Nullable 
String failSize);
 
     /**
      * @return The threshold to warn when encountering more elements in a 
collection than threshold.
@@ -447,4 +459,39 @@ public interface GuardrailsMBean
      * @param fail The threshold to prevent creating a UDT with more fields 
than threshold. -1 means disabled.
      */
     void setFieldsPerUDTThreshold(int warn, int fail);
+
+    /**
+     * @return The threshold to warn when local data disk usage percentage 
exceeds that threshold.
+     * Allowed values are in the range {@code [1, 100]}, and -1 means disabled.
+     */
+    int getDataDiskUsagePercentageWarnThreshold();
+
+    /**
+     * @return The threshold to fail when local data disk usage percentage 
exceeds that threshold.
+     * Allowed values are in the range {@code [1, 100]}, and -1 means disabled.
+     */
+    int getDataDiskUsagePercentageFailThreshold();
+
+    /**
+     * @param warn The threshold to warn when local disk usage percentage 
exceeds that threshold.
+     *             Allowed values are in the range {@code [1, 100]}, and -1 
means disabled.
+     * @param fail The threshold to fail when local disk usage percentage 
exceeds that threshold.
+     *             Allowed values are in the range {@code [1, 100]}, and -1 
means disabled.
+     */
+    public void setDataDiskUsagePercentageThreshold(int warn, int fail);
+
+    /**
+     * @return The max disk size of the data directories when calculating disk 
usage thresholds, as a string formatted
+     * as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or 
{@code 40B}. A {@code null} value means
+     * disabled.
+     */
+    @Nullable
+    String getDataDiskUsageMaxDiskSize();
+
+    /**
+     * @param size The max disk size of the data directories when calculating 
disk usage thresholds, as a string
+     *             formatted as in, for example, {@code 10GiB}, {@code 20MiB}, 
{@code 30KiB} or {@code 40B}.
+     *             A {@code null} value means disabled.
+     */
+    void setDataDiskUsageMaxDiskSize(@Nullable String size);
 }
diff --git 
a/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java 
b/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java
new file mode 100644
index 0000000000..c08d02641b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.util.function.ToLongFunction;
+
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * A {@link Threshold} guardrail whose values represent a percentage
+ * <p>
+ * This works exactly as a {@link Threshold}, but provides slightly more 
convenient error messages for percentage
+ */
+public class PercentageThreshold extends Threshold
+{
+    /**
+     * Creates a new threshold guardrail.
+     *
+     * @param name            the identifying name of the guardrail
+     * @param warnThreshold   a {@link ClientState}-based provider of the 
value above which a warning should be triggered.
+     * @param failThreshold   a {@link ClientState}-based provider of the 
value above which the operation should be aborted.
+     * @param messageProvider a function to generate the warning or error 
message if the guardrail is triggered
+     */
+    public PercentageThreshold(String name,
+                               ToLongFunction<ClientState> warnThreshold,
+                               ToLongFunction<ClientState> failThreshold,
+                               ErrorMessageProvider messageProvider)
+    {
+        super(name, warnThreshold, failThreshold, messageProvider);
+    }
+
+    @Override
+    protected String errMsg(boolean isWarning, String what, long value, long 
thresholdValue)
+    {
+        return messageProvider.createMessage(isWarning,
+                                             what,
+                                             String.format("%d%%", value),
+                                             String.format("%d%%", 
thresholdValue));
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/guardrails/Predicates.java 
b/src/java/org/apache/cassandra/db/guardrails/Predicates.java
new file mode 100644
index 0000000000..13be9e9302
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/guardrails/Predicates.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.util.function.Function;
+import java.util.function.Predicate;
+import javax.annotation.Nullable;
+
+import org.apache.cassandra.service.ClientState;
+
+/**
+ * A guardrail based on two predicates.
+ *
+ * <p>A {@link Predicates} guardrail defines (up to) 2 predicates, one at 
which a warning is issued, and another one
+ * at which a failure is triggered. If failure is triggered, warning is 
skipped.
+ *
+ * @param <T> the type of the values to be tested against predicates.
+ */
+public class Predicates<T> extends Guardrail
+{
+    private final Function<ClientState, Predicate<T>> warnPredicate;
+    private final Function<ClientState, Predicate<T>> failurePredicate;
+    private final MessageProvider<T> messageProvider;
+
+    /**
+     * A function used to build the warning or error message of a triggered 
{@link Predicates} guardrail.
+     */
+    public interface MessageProvider<T>
+    {
+        /**
+         * Called when the guardrail is triggered to build the corresponding 
message.
+         *
+         * @param isWarning whether the trigger is a warning one; otherwise it 
is failure one.
+         * @param value     the value that triggers guardrail.
+         */
+        String createMessage(boolean isWarning, T value);
+    }
+
+    /**
+     * Creates a new {@link Predicates} guardrail.
+     *
+     * @param name             the identifying name of the guardrail
+     * @param warnPredicate    a {@link ClientState}-based predicate provider 
that is used to check if given value should trigger a warning.
+     * @param failurePredicate a {@link ClientState}-based predicate provider 
that is used to check if given value should trigger a failure.
+     * @param messageProvider  a function to generate the warning or error 
message if the guardrail is triggered
+     */
+    Predicates(String name,
+               Function<ClientState, Predicate<T>> warnPredicate,
+               Function<ClientState, Predicate<T>> failurePredicate,
+               MessageProvider<T> messageProvider)
+    {
+        super(name);
+        this.warnPredicate = warnPredicate;
+        this.failurePredicate = failurePredicate;
+        this.messageProvider = messageProvider;
+    }
+
+    /**
+     * Apply the guardrail to the provided value, triggering a warning or 
failure if appropriate.
+     *
+     * @param value the value to check.
+     */
+    public void guard(T value, @Nullable ClientState state)
+    {
+        if (!enabled(state))
+            return;
+
+        if (failurePredicate.apply(state).test(value))
+        {
+            fail(messageProvider.createMessage(false, value), state);
+        }
+        else if (warnPredicate.apply(state).test(value))
+        {
+            warn(messageProvider.createMessage(true, value));
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/guardrails/Threshold.java 
b/src/java/org/apache/cassandra/db/guardrails/Threshold.java
index f88d1e0cbe..f7e4823448 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Threshold.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Threshold.java
@@ -35,7 +35,7 @@ public class Threshold extends Guardrail
 {
     private final ToLongFunction<ClientState> warnThreshold;
     private final ToLongFunction<ClientState> failThreshold;
-    private final ErrorMessageProvider messageProvider;
+    protected final ErrorMessageProvider messageProvider;
 
     /**
      * Creates a new threshold guardrail.
@@ -56,12 +56,12 @@ public class Threshold extends Guardrail
         this.messageProvider = messageProvider;
     }
 
-    private String errMsg(boolean isWarning, String what, long value, long 
thresholdValue)
+    protected String errMsg(boolean isWarning, String what, long value, long 
thresholdValue)
     {
         return messageProvider.createMessage(isWarning,
                                              what,
-                                             value,
-                                             thresholdValue);
+                                             Long.toString(value),
+                                             Long.toString(thresholdValue));
     }
 
     private String redactedErrMsg(boolean isWarning, long value, long 
thresholdValue)
@@ -108,6 +108,16 @@ public class Threshold extends Guardrail
         return enabled(state) && (value > Math.min(failValue(state), 
warnValue(state)));
     }
 
+    public boolean warnsOn(long value, @Nullable ClientState state)
+    {
+        return enabled(state) && (value > warnValue(state) && value <= 
failValue(state));
+    }
+
+    public boolean failsOn(long value, @Nullable ClientState state)
+    {
+        return enabled(state) && (value > failValue(state));
+    }
+
     /**
      * Apply the guardrail to the provided value, warning or failing if 
appropriate.
      *
@@ -163,6 +173,6 @@ public class Threshold extends Guardrail
          * @param value     The value that triggered the guardrail (as a 
string).
          * @param threshold The threshold that was passed to trigger the 
guardrail (as a string).
          */
-        String createMessage(boolean isWarning, String what, long value, long 
threshold);
+        String createMessage(boolean isWarning, String what, String value, 
String threshold);
     }
 }
diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java 
b/src/java/org/apache/cassandra/gms/ApplicationState.java
index 4e20d62048..c45d3c2602 100644
--- a/src/java/org/apache/cassandra/gms/ApplicationState.java
+++ b/src/java/org/apache/cassandra/gms/ApplicationState.java
@@ -56,6 +56,7 @@ public enum ApplicationState
      * a comma-separated list.
      **/
     SSTABLE_VERSIONS,
+    DISK_USAGE,
     // DO NOT EDIT OR REMOVE PADDING STATES BELOW - only add new states above. 
 See CASSANDRA-16484
     X1,
     X2,
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 63e9487d7c..26644e17cc 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -172,6 +172,11 @@ public class VersionedValue implements 
Comparable<VersionedValue>
             return new VersionedValue(String.valueOf(load));
         }
 
+        public VersionedValue diskUsage(String state)
+        {
+            return new VersionedValue(state);
+        }
+
         public VersionedValue schema(UUID newVersion)
         {
             return new VersionedValue(newVersion.toString());
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index 93447c2f44..186ee5abc4 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -398,7 +398,7 @@ public abstract class SSTableWriter extends SSTable 
implements Transactional
 
     public static void guardCollectionSize(TableMetadata metadata, 
DecoratedKey partitionKey, Unfiltered unfiltered)
     {
-        if (!Guardrails.collectionSize.enabled(null) && 
!Guardrails.itemsPerCollection.enabled(null))
+        if (!Guardrails.collectionSize.enabled() && 
!Guardrails.itemsPerCollection.enabled())
             return;
 
         if (!unfiltered.isRow() || 
SchemaConstants.isSystemKeyspace(metadata.keyspace))
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 0e8b2570fe..6562635dca 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -72,6 +72,7 @@ import 
org.apache.cassandra.fql.FullQueryLoggerOptionsCompositeData;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict;
 import org.apache.cassandra.schema.Keyspaces;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
 import org.apache.cassandra.utils.concurrent.Future;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.utils.concurrent.FutureCombiner;
@@ -1034,6 +1035,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             gossipSnitchInfo();
             Schema.instance.startSync();
             LoadBroadcaster.instance.startBroadcasting();
+            DiskUsageBroadcaster.instance.startBroadcasting();
             HintsService.instance.startDispatch();
             BatchlogManager.instance.start();
             snapshotManager.start();
diff --git 
a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java 
b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
new file mode 100644
index 0000000000..4504ac7f62
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.NoSpamLogger;
+
+/**
+ * Starts {@link DiskUsageMonitor} to monitor local disk usage state and 
broadcast new state via Gossip.
+ * At the same time, it caches cluster's disk usage state received via Gossip.
+ */
+public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(DiskUsageBroadcaster.class);
+    private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 10, TimeUnit.MINUTES);
+
+    public static final DiskUsageBroadcaster instance = new 
DiskUsageBroadcaster(DiskUsageMonitor.instance);
+
+    private final DiskUsageMonitor monitor;
+    private final ConcurrentMap<InetAddressAndPort, DiskUsageState> usageInfo 
= new ConcurrentHashMap<>();
+    private volatile boolean hasStuffedOrFullNode = false;
+
+    @VisibleForTesting
+    public DiskUsageBroadcaster(DiskUsageMonitor monitor)
+    {
+        this.monitor = monitor;
+        Gossiper.instance.register(this);
+    }
+
+    /**
+     * @return {@code true} if any node in the cluster is STUFFED OR FULL
+     */
+    public boolean hasStuffedOrFullNode()
+    {
+        return hasStuffedOrFullNode;
+    }
+
+    /**
+     * @return {@code true} if given node's disk usage is FULL
+     */
+    public boolean isFull(InetAddressAndPort endpoint)
+    {
+        return state(endpoint).isFull();
+    }
+
+    /**
+     * @return {@code true} if given node's disk usage is STUFFED
+     */
+    public boolean isStuffed(InetAddressAndPort endpoint)
+    {
+        return state(endpoint).isStuffed();
+    }
+
+    @VisibleForTesting
+    public DiskUsageState state(InetAddressAndPort endpoint)
+    {
+        return usageInfo.getOrDefault(endpoint, DiskUsageState.NOT_AVAILABLE);
+    }
+
+    public void startBroadcasting()
+    {
+        monitor.start(newState -> {
+
+            if (logger.isTraceEnabled())
+                logger.trace("Disseminating disk usage info: {}", newState);
+
+            
Gossiper.instance.addLocalApplicationState(ApplicationState.DISK_USAGE,
+                                                       
StorageService.instance.valueFactory.diskUsage(newState.name()));
+        });
+    }
+
+    @Override
+    public void onChange(InetAddressAndPort endpoint, ApplicationState state, 
VersionedValue value)
+    {
+        if (state != ApplicationState.DISK_USAGE)
+            return;
+
+        DiskUsageState usageState = DiskUsageState.NOT_AVAILABLE;
+        try
+        {
+            usageState = DiskUsageState.valueOf(value.value);
+        }
+        catch (IllegalArgumentException e)
+        {
+            noSpamLogger.warn(String.format("Found unknown DiskUsageState: %s. 
Using default state %s instead.",
+                                            value.value, usageState));
+        }
+        usageInfo.put(endpoint, usageState);
+
+        hasStuffedOrFullNode = usageState.isStuffedOrFull() || 
computeHasStuffedOrFullNode();
+    }
+
+    private boolean computeHasStuffedOrFullNode()
+    {
+        for (DiskUsageState replicaState : usageInfo.values())
+        {
+            if (replicaState.isStuffedOrFull())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    public void onJoin(InetAddressAndPort endpoint, EndpointState epState)
+    {
+        updateDiskUsage(endpoint, epState);
+    }
+
+    @Override
+    public void beforeChange(InetAddressAndPort endpoint, EndpointState 
currentState, ApplicationState newStateKey, VersionedValue newValue)
+    {
+        // nothing to do here
+    }
+
+    @Override
+    public void onAlive(InetAddressAndPort endpoint, EndpointState state)
+    {
+        updateDiskUsage(endpoint, state);
+    }
+
+    @Override
+    public void onDead(InetAddressAndPort endpoint, EndpointState state)
+    {
+        // do nothing, as we don't care about dead nodes
+    }
+
+    @Override
+    public void onRestart(InetAddressAndPort endpoint, EndpointState state)
+    {
+        updateDiskUsage(endpoint, state);
+    }
+
+    @Override
+    public void onRemove(InetAddressAndPort endpoint)
+    {
+        usageInfo.remove(endpoint);
+        hasStuffedOrFullNode = 
usageInfo.values().stream().anyMatch(DiskUsageState::isStuffedOrFull);
+    }
+
+    private void updateDiskUsage(InetAddressAndPort endpoint, EndpointState 
state)
+    {
+        VersionedValue localValue = 
state.getApplicationState(ApplicationState.DISK_USAGE);
+
+        if (localValue != null)
+        {
+            onChange(endpoint, ApplicationState.DISK_USAGE, localValue);
+        }
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java 
b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java
new file mode 100644
index 0000000000..7395c5fb3b
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Memtable;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+import org.apache.cassandra.io.util.FileUtils;
+
+/**
+ * Schedule periodic task to monitor local disk usage and notify {@link 
DiskUsageBroadcaster} if local state changed.
+ */
+public class DiskUsageMonitor
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(DiskUsageMonitor.class);
+
+    public static DiskUsageMonitor instance = new DiskUsageMonitor();
+
+    private final Supplier<GuardrailsConfig> guardrailsConfigSupplier = () -> 
Guardrails.CONFIG_PROVIDER.getOrCreate(null);
+    private final Supplier<Multimap<FileStore, Directories.DataDirectory>> 
dataDirectoriesSupplier;
+
+    private volatile DiskUsageState localState = DiskUsageState.NOT_AVAILABLE;
+
+    @VisibleForTesting
+    public DiskUsageMonitor()
+    {
+        this.dataDirectoriesSupplier = 
DiskUsageMonitor::dataDirectoriesGroupedByFileStore;
+    }
+
+    @VisibleForTesting
+    public DiskUsageMonitor(Supplier<Multimap<FileStore, 
Directories.DataDirectory>> dataDirectoriesSupplier)
+    {
+        this.dataDirectoriesSupplier = dataDirectoriesSupplier;
+    }
+
+    /**
+     * Start monitoring local disk usage and call notifier when local disk 
usage state changed.
+     */
+    public void start(Consumer<DiskUsageState> notifier)
+    {
+        // start the scheduler regardless guardrail is enabled, so we can 
enable it later without a restart
+        ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> {
+
+            if (!Guardrails.localDataDiskUsage.enabled(null))
+                return;
+
+            updateLocalState(getDiskUsage(), notifier);
+        }, 0, 
CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.getLong(), 
TimeUnit.MILLISECONDS);
+    }
+
+    @VisibleForTesting
+    public void updateLocalState(double usageRatio, Consumer<DiskUsageState> 
notifier)
+    {
+        double percentage = usageRatio * 100;
+        long percentageCeiling = (long) Math.ceil(percentage);
+
+        DiskUsageState state = getState(percentageCeiling);
+
+        Guardrails.localDataDiskUsage.guard(percentageCeiling, 
state.toString(), false, null);
+
+        // if state remains unchanged, no need to notify peers
+        if (state == localState)
+            return;
+
+        localState = state;
+        notifier.accept(state);
+    }
+
+    /**
+     * @return local node disk usage state
+     */
+    @VisibleForTesting
+    public DiskUsageState state()
+    {
+        return localState;
+    }
+
+    /**
+     * @return The current disk usage (including all memtable sizes) ratio. 
This is the ratio between the space taken by
+     * all the data directories and the addition of that same space and the 
free available space on disk. The space
+     * taken by the data directories is the addition of the actual space on 
disk plus the size of the memtables.
+     * Memtables are included in that calculation because they are expected to 
be eventually flushed to disk.
+     */
+    @VisibleForTesting
+    public double getDiskUsage()
+    {
+        // using BigInteger to handle large file system
+        BigInteger used = BigInteger.ZERO; // space used by data directories
+        BigInteger usable = BigInteger.ZERO; // free space on disks
+
+        for (Map.Entry<FileStore, Collection<Directories.DataDirectory>> e : 
dataDirectoriesSupplier.get().asMap().entrySet())
+        {
+            usable = usable.add(BigInteger.valueOf(usableSpace(e.getKey())));
+
+            for (Directories.DataDirectory dir : e.getValue())
+                used = used.add(BigInteger.valueOf(dir.getRawSize()));
+        }
+
+        // The total disk size for data directories is the space that is 
actually used by those directories plus the
+        // free space on disk that might be used for storing those directories 
in the future.
+        BigInteger total = used.add(usable);
+
+        // That total space can be limited by the config property 
data_disk_usage_max_disk_size.
+        DataStorageSpec diskUsageMaxSize = 
guardrailsConfigSupplier.get().getDataDiskUsageMaxDiskSize();
+        if (diskUsageMaxSize != null)
+            total = total.min(BigInteger.valueOf(diskUsageMaxSize.toBytes()));
+
+        // Add memtables size to the amount of used space because those 
memtables will be flushed to data directories.
+        used = used.add(BigInteger.valueOf(getAllMemtableSize()));
+
+        if (logger.isTraceEnabled())
+            logger.trace("Disk Usage Guardrail: current disk usage = {}, total 
disk usage = {}.",
+                         FileUtils.stringifyFileSize(used.doubleValue()),
+                         FileUtils.stringifyFileSize(total.doubleValue()));
+
+        return new BigDecimal(used).divide(new BigDecimal(total), 5, 
RoundingMode.UP).doubleValue();
+    }
+
+    @VisibleForTesting
+    public long getAllMemtableSize()
+    {
+        long size = 0;
+
+        for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+        {
+            for (Memtable memtable : 
cfs.getTracker().getView().getAllMemtables())
+            {
+                size += memtable.getLiveDataSize();
+            }
+        }
+
+        return size;
+    }
+
+    @VisibleForTesting
+    public DiskUsageState getState(long usagePercentage)
+    {
+        if (!Guardrails.localDataDiskUsage.enabled())
+            return DiskUsageState.NOT_AVAILABLE;
+
+        if (Guardrails.localDataDiskUsage.failsOn(usagePercentage, null))
+            return DiskUsageState.FULL;
+
+        if (Guardrails.localDataDiskUsage.warnsOn(usagePercentage, null))
+            return DiskUsageState.STUFFED;
+
+        return DiskUsageState.SPACIOUS;
+    }
+
+    public static Multimap<FileStore, Directories.DataDirectory> 
dataDirectoriesGroupedByFileStore()
+    {
+        Multimap<FileStore, Directories.DataDirectory> directories = 
HashMultimap.create();
+        try
+        {
+            for (Directories.DataDirectory dir : 
Directories.dataDirectories.getAllDirectories())
+            {
+                FileStore store = Files.getFileStore(dir.location.toPath());
+                directories.put(store, dir);
+            }
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Cannot get data directories grouped by 
file store", e);
+        }
+        return directories;
+    }
+
+    public static long totalSpace(FileStore store)
+    {
+        try
+        {
+            long size = store.getTotalSpace();
+            return size < 0 ? Long.MAX_VALUE : size;
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Cannot get total space of file store", 
e);
+        }
+    }
+
+    public static long usableSpace(FileStore store)
+    {
+        try
+        {
+            long size = store.getUsableSpace();
+            return size < 0 ? Long.MAX_VALUE : size;
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Cannot get usable size of file store", 
e);
+        }
+    }
+}
+
diff --git 
a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java 
b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java
new file mode 100644
index 0000000000..9a46251ff8
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.disk.usage;
+
+import org.apache.cassandra.db.guardrails.GuardrailsConfig;
+
+public enum DiskUsageState
+{
+    /** Either disk usage guardrail is not enabled or gossip state is not 
ready. */
+    NOT_AVAILABLE("Not Available"),
+
+    /**
+     * Disk usage is below both {@link 
GuardrailsConfig#getDataDiskUsagePercentageWarnThreshold()} ()} and
+     * {@link GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}.
+     */
+    SPACIOUS("Spacious"),
+
+    /**
+     * Disk usage exceeds {@link 
GuardrailsConfig#getDataDiskUsagePercentageWarnThreshold()} but is below
+     * {@link GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}.
+     */
+    STUFFED("Stuffed"),
+
+    /** Disk usage exceeds {@link 
GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}. */
+    FULL("Full");
+
+    private final String msg;
+
+    DiskUsageState(String msg)
+    {
+        this.msg = msg;
+    }
+
+    public boolean isFull()
+    {
+        return this == FULL;
+    }
+
+    public boolean isStuffed()
+    {
+        return this == STUFFED;
+    }
+
+    public boolean isStuffedOrFull()
+    {
+        return isFull() || isStuffed();
+    }
+
+    @Override
+    public String toString()
+    {
+        return msg;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java
new file mode 100644
index 0000000000..e1868e280f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+
+/**
+ * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} 
and {@link Guardrails#replicaDiskUsage}.
+ */
+public class GuardrailDiskUsageTest extends GuardrailTester
+{
+    private static final int NUM_ROWS = 100;
+
+    private static final String WARN_MESSAGE = "Replica disk usage exceeds 
warning threshold";
+    private static final String FAIL_MESSAGE = "Write request failed because 
disk usage exceeds failure threshold";
+
+    private static Cluster cluster;
+    private static com.datastax.driver.core.Cluster driverCluster;
+    private static Session driverSession;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        // speed up the task that calculates and propagates the disk usage info
+        CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.setInt(100);
+
+        // build a 2-node cluster with RF=1
+        cluster = init(Cluster.build(2)
+                              
.withInstanceInitializer(DiskStateInjection::install)
+                              .withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NATIVE_PROTOCOL)
+                                                
.set("data_disk_usage_percentage_warn_threshold", 98)
+                                                
.set("data_disk_usage_percentage_fail_threshold", 99)
+                                                .set("authenticator", 
"PasswordAuthenticator"))
+                              .start(), 1);
+
+        // create a regular user, since the default superuser is excluded from 
guardrails
+        com.datastax.driver.core.Cluster.Builder builder = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
+        try (com.datastax.driver.core.Cluster c = 
builder.withCredentials("cassandra", "cassandra").build();
+             Session session = c.connect())
+        {
+            session.execute("CREATE USER test WITH PASSWORD 'test'");
+        }
+
+        // connect using that superuser, we use the driver to get access to 
the client warnings
+        driverCluster = builder.withCredentials("test", "test").build();
+        driverSession = driverCluster.connect();
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (driverSession != null)
+            driverSession.close();
+
+        if (driverCluster != null)
+            driverCluster.close();
+
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Override
+    protected Cluster getCluster()
+    {
+        return cluster;
+    }
+
+    @Test
+    public void testDiskUsage() throws Throwable
+    {
+        schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v int)");
+        String insert = format("INSERT INTO %s(k, v) VALUES (?, 0)");
+
+        // With both nodes in SPACIOUS state, we can write without warnings 
nor failures
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+
+        // If the disk usage information about one node becomes unavailable, 
we can still write without warnings
+        DiskStateInjection.setState(getCluster(), 2, 
DiskUsageState.NOT_AVAILABLE);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+
+        // If one node becomes STUFFED, the writes targeting that node will 
raise a warning, while the writes targetting
+        // the node that remains SPACIOUS will keep succeeding without warnings
+        DiskStateInjection.setState(getCluster(), 2, DiskUsageState.STUFFED);
+        int numWarnings = 0;
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+
+            List<String> warnings = rs.getExecutionInfo().getWarnings();
+            if (!warnings.isEmpty())
+            {
+                Assertions.assertThat(warnings).hasSize(1).anyMatch(s -> 
s.contains(WARN_MESSAGE));
+                numWarnings++;
+            }
+        }
+        
Assertions.assertThat(numWarnings).isGreaterThan(0).isLessThan(NUM_ROWS);
+
+        // If the STUFFED node becomes FULL, the writes targeting that node 
will fail, while the writes targeting
+        // the node that remains SPACIOUS will keep succeeding without warnings
+        DiskStateInjection.setState(getCluster(), 2, DiskUsageState.FULL);
+        int numFailures = 0;
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            try
+            {
+                ResultSet rs = driverSession.execute(insert, i);
+                
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+            }
+            catch (InvalidQueryException e)
+            {
+                Assertions.assertThat(e).hasMessageContaining(FAIL_MESSAGE);
+                numFailures++;
+            }
+        }
+        
Assertions.assertThat(numFailures).isGreaterThan(0).isLessThan(NUM_ROWS);
+
+        // If both nodes are FULL, all queries will fail
+        DiskStateInjection.setState(getCluster(), 1, DiskUsageState.FULL);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            try
+            {
+                driverSession.execute(insert, i);
+                Assertions.fail("Should have failed");
+            }
+            catch (InvalidQueryException e)
+            {
+                numFailures++;
+            }
+        }
+
+        // Finally, if both nodes go back to SPACIOUS, all queries will 
succeed again
+        DiskStateInjection.setState(getCluster(), 1, DiskUsageState.SPACIOUS);
+        DiskStateInjection.setState(getCluster(), 2, DiskUsageState.SPACIOUS);
+        for (int i = 0; i < NUM_ROWS; i++)
+        {
+            ResultSet rs = driverSession.execute(insert, i);
+            
Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty();
+        }
+    }
+
+    /**
+     * ByteBuddy rule to override the disk usage state of each node.
+     */
+    public static class DiskStateInjection
+    {
+        public static volatile DiskUsageState state = DiskUsageState.SPACIOUS;
+
+        private static void install(ClassLoader cl, int node)
+        {
+            new ByteBuddy().rebase(DiskUsageMonitor.class)
+                           .method(named("getState"))
+                           
.intercept(MethodDelegation.to(DiskStateInjection.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static void setState(Cluster cluster, int node, DiskUsageState 
state)
+        {
+            IInvokableInstance instance = cluster.get(node);
+            instance.runOnInstance(() -> DiskStateInjection.state = state);
+
+            // wait for disk usage state propagation, all nodes must see it
+            InetAddressAndPort enpoint = 
InetAddressAndPort.getByAddress(instance.broadcastAddress());
+            cluster.forEach(n -> n.runOnInstance(() -> 
Util.spinAssertEquals(state, () -> 
DiskUsageBroadcaster.instance.state(enpoint), 60)));
+        }
+
+        @SuppressWarnings("unused")
+        public static DiskUsageState getState(long usagePercentage, @SuperCall 
Callable<DiskUsageState> zuper)
+        {
+            return state;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java 
b/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java
index 66c644498d..4dad97a0a8 100644
--- a/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java
+++ b/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java
@@ -101,11 +101,36 @@ public class DataStorageSpecTest
     public void testEquals()
     {
         assertEquals(new DataStorageSpec("10B"), new DataStorageSpec("10B"));
+
         assertEquals(new DataStorageSpec("10KiB"), new 
DataStorageSpec("10240B"));
         assertEquals(new DataStorageSpec("10240B"), new 
DataStorageSpec("10KiB"));
+
+        assertEquals(new DataStorageSpec("10MiB"), new 
DataStorageSpec("10240KiB"));
+        assertEquals(new DataStorageSpec("10240KiB"), new 
DataStorageSpec("10MiB"));
+
+        assertEquals(new DataStorageSpec("10GiB"), new 
DataStorageSpec("10240MiB"));
+        assertEquals(new DataStorageSpec("10240MiB"), new 
DataStorageSpec("10GiB"));
+
+        assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE), 
DataStorageSpec.inGibibytes(Long.MAX_VALUE));
+        assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE), 
DataStorageSpec.inMebibytes(Long.MAX_VALUE));
+        assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE), 
DataStorageSpec.inKibibytes(Long.MAX_VALUE));
+        assertEquals(DataStorageSpec.inBytes(Long.MAX_VALUE), 
DataStorageSpec.inBytes(Long.MAX_VALUE));
+
+        assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE), 
DataStorageSpec.inGibibytes(Long.MAX_VALUE));
+        assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE), 
DataStorageSpec.inMebibytes(Long.MAX_VALUE));
+        assertEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE), 
DataStorageSpec.inKibibytes(Long.MAX_VALUE));
+        assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE), 
DataStorageSpec.inBytes(Long.MAX_VALUE));
+
+        assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), 
DataStorageSpec.inGibibytes(Long.MAX_VALUE));
         assertEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), 
DataStorageSpec.inMebibytes(Long.MAX_VALUE));
-        assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), 
DataStorageSpec.inKibibytes(Long.MAX_VALUE));
         assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), 
DataStorageSpec.inBytes(Long.MAX_VALUE));
+        assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), 
DataStorageSpec.inBytes(Long.MAX_VALUE));
+
+        assertEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE), 
DataStorageSpec.inGibibytes(Long.MAX_VALUE));
+        assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE), 
DataStorageSpec.inMebibytes(Long.MAX_VALUE));
+        assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE), 
DataStorageSpec.inKibibytes(Long.MAX_VALUE));
+        assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE), 
DataStorageSpec.inBytes(Long.MAX_VALUE));
+
         assertNotEquals(new DataStorageSpec("0MiB"), new 
DataStorageSpec("10KiB"));
     }
 
@@ -138,4 +163,4 @@ public class DataStorageSpecTest
         Gen<DataStorageSpec> gen = rs -> new 
DataStorageSpec(valueGen.generate(rs), unitGen.generate(rs));
         return gen.describedAs(DataStorageSpec::toString);
     }
-}
\ No newline at end of file
+}
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
index 482e37b2ef..1483e8101f 100644
--- 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
+++ 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java
@@ -48,12 +48,12 @@ public class GuardrailCollectionSizeTest extends 
ThresholdTester
 
     public GuardrailCollectionSizeTest()
     {
-        super(WARN_THRESHOLD / 1024, // to KiB
-              FAIL_THRESHOLD / 1024, // to KiB
+        super(WARN_THRESHOLD + "B",
+              FAIL_THRESHOLD + "B",
               Guardrails.collectionSize,
-              Guardrails::setCollectionSizeThresholdInKiB,
-              Guardrails::getCollectionSizeWarnThresholdInKiB,
-              Guardrails::getCollectionSizeFailThresholdInKiB);
+              Guardrails::setCollectionSizeThreshold,
+              Guardrails::getCollectionSizeWarnThreshold,
+              Guardrails::getCollectionSizeFailThreshold);
     }
 
     @After
diff --git 
a/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java
new file mode 100644
index 0000000000..14c2c3acea
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.guardrails;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.nio.file.FileStore;
+import java.util.Arrays;
+import java.util.function.Consumer;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.DataStorageSpec;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster;
+import org.apache.cassandra.service.disk.usage.DiskUsageMonitor;
+import org.apache.cassandra.service.disk.usage.DiskUsageState;
+import org.apache.cassandra.utils.FBUtilities;
+import org.mockito.Mockito;
+
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.FULL;
+import static 
org.apache.cassandra.service.disk.usage.DiskUsageState.NOT_AVAILABLE;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.SPACIOUS;
+import static org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} 
and {@link Guardrails#replicaDiskUsage}.
+ */
+public class GuardrailDiskUsageTest extends GuardrailTester
+{
+    private static int defaultDataDiskUsagePercentageWarnThreshold;
+    private static int defaultDataDiskUsagePercentageFailThreshold;
+
+    @BeforeClass
+    public static void beforeClass()
+    {
+        defaultDataDiskUsagePercentageWarnThreshold = 
Guardrails.instance.getDataDiskUsagePercentageWarnThreshold();
+        defaultDataDiskUsagePercentageFailThreshold = 
Guardrails.instance.getDataDiskUsagePercentageFailThreshold();
+
+        Guardrails.instance.setDataDiskUsagePercentageThreshold(-1, -1);
+    }
+
+    @AfterClass
+    public static void afterClass()
+    {
+        
Guardrails.instance.setDataDiskUsagePercentageThreshold(defaultDataDiskUsagePercentageWarnThreshold,
+                                                                
defaultDataDiskUsagePercentageFailThreshold);
+    }
+
+    @Test
+    public void testConfigValidation()
+    {
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize(null));
+        assertNull(guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0B"), "0 is not 
allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0KiB"), "0 is 
not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0MiB"), "0 is 
not allowed");
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0GiB"), "0 is 
not allowed");
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("10B"));
+        assertEquals("10B", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("20KiB"));
+        assertEquals("20KiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("30MiB"));
+        assertEquals("30MiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("40GiB"));
+        assertEquals("40GiB", guardrails().getDataDiskUsageMaxDiskSize());
+
+        assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize(Long.MAX_VALUE + 
"GiB"), "are actually available on disk");
+
+        // warn threshold smaller than lower bound
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(0, 80), 
"0 is not allowed");
+
+        // fail threshold bigger than upper bound
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(1, 110), 
"maximum allowed value is 100");
+
+        // warn threshold larger than fail threshold
+        assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(60, 50),
+                          "The warn threshold 60 for 
data_disk_usage_percentage_warn_threshold should be lower than the fail 
threshold 50");
+    }
+
+    @Test
+    public void testDiskUsageState()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(10));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+        // exceed warning threshold
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(56));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(90));
+
+        // exceed fail threshold
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+
+        // shouldn't be possible to go over 100% but just to be sure
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(101));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(500));
+    }
+
+    @Test
+    public void testDiskUsageDetectorWarnDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(-1, 90);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(0));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(90));
+
+        // exceed fail threshold
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(91));
+        assertEquals(FULL, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testDiskUsageDetectorFailDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, -1);
+
+        // under usage
+        assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50));
+
+        // exceed warning threshold
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(80));
+        assertEquals(STUFFED, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testDiskUsageGuardrailDisabled()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(-1, -1);
+
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(0));
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(60));
+        assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(100));
+    }
+
+    @Test
+    public void testMemtableSizeIncluded() throws Throwable
+    {
+        DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+        createTable(keyspace(), "CREATE TABLE %s (k text PRIMARY KEY, v text) 
WITH compression = { 'enabled': false }");
+
+        long memtableSizeBefore = monitor.getAllMemtableSize();
+        int rows = 10;
+        int mb = 1024 * 1024;
+
+        for (int i = 0; i < rows; i++)
+        {
+            char[] chars = new char[mb];
+            Arrays.fill(chars, (char) i);
+            String value = String.copyValueOf(chars);
+            execute("INSERT INTO %s (k, v) VALUES(?, ?)", i, value);
+        }
+
+        // verify memtables are included
+        long memtableSizeAfterInsert = monitor.getAllMemtableSize();
+        assertTrue(String.format("Expect at least 10MB more data, but got 
before: %s and after: %d",
+                                 memtableSizeBefore, memtableSizeAfterInsert),
+                   memtableSizeAfterInsert - memtableSizeBefore >= rows * mb);
+
+        // verify memtable size are reduced after flush
+        flush();
+        long memtableSizeAfterFlush = monitor.getAllMemtableSize();
+        assertEquals(memtableSizeBefore, memtableSizeAfterFlush, mb);
+    }
+
+    @Test
+    public void testMonitorLogsOnStateChange()
+    {
+        guardrails().setDataDiskUsagePercentageThreshold(50, 90);
+
+        Guardrails.localDataDiskUsage.resetLastNotifyTime();
+
+        DiskUsageMonitor monitor = new DiskUsageMonitor();
+
+        // transit to SPACIOUS, no logging
+        assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+
+        // transit to STUFFED, expect warning
+        assertMonitorStateTransition(0.50001, STUFFED, monitor, true, "Local 
data disk usage 51%(Stuffed) exceeds warning threshold of 50%");
+
+        // remain as STUFFED, no logging because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit to FULL, expect failure
+        assertMonitorStateTransition(0.90001, FULL, monitor, false, "Local 
data disk usage 91%(Full) exceeds failure threshold of 90%, will stop accepting 
writes");
+
+        // remain as FULL, no logging because of min log interval
+        assertMonitorStateTransition(0.99, FULL, monitor);
+
+        // remain as FULL, no logging because of min log interval
+        assertMonitorStateTransition(5.0, FULL, monitor);
+
+        // transit back to STUFFED, no warning  because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit back to FULL, no logging  because of min log interval
+        assertMonitorStateTransition(0.900001, FULL, monitor);
+
+        // transit back to STUFFED, no logging  because of min log interval
+        assertMonitorStateTransition(0.90, STUFFED, monitor);
+
+        // transit to SPACIOUS, no logging
+        assertMonitorStateTransition(0.50, SPACIOUS, monitor);
+    }
+
+    @Test
+    public void testDiskUsageBroadcaster() throws UnknownHostException
+    {
+        DiskUsageBroadcaster broadcaster = new DiskUsageBroadcaster(null);
+        Gossiper.instance.unregister(broadcaster);
+
+        InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.1");
+        InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.2");
+        InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.3");
+
+        // initially it's NOT_AVAILABLE
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node1));
+        assertFalse(broadcaster.isFull(node2));
+        assertFalse(broadcaster.isFull(node3));
+
+        // adding 1st node: Spacious, cluster has no Full node
+        broadcaster.onChange(node1, ApplicationState.DISK_USAGE, 
value(SPACIOUS));
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node1));
+
+        // adding 2nd node with wrong ApplicationState
+        broadcaster.onChange(node2, ApplicationState.RACK, value(FULL));
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node2));
+
+        // adding 2nd node: STUFFED
+        broadcaster.onChange(node2, ApplicationState.DISK_USAGE, 
value(STUFFED));
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertTrue(broadcaster.isStuffed(node2));
+
+        // adding 3rd node: FULL
+        broadcaster.onChange(node3, ApplicationState.DISK_USAGE, value(FULL));
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertTrue(broadcaster.isFull(node3));
+
+        // remove 2nd node, cluster has Full node
+        broadcaster.onRemove(node2);
+        assertTrue(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isStuffed(node2));
+
+        // remove 3nd node, cluster has no Full node
+        broadcaster.onRemove(node3);
+        assertFalse(broadcaster.hasStuffedOrFullNode());
+        assertFalse(broadcaster.isFull(node3));
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSize() throws IOException
+    {
+        Directories.DataDirectory directory = 
mock(Directories.DataDirectory.class);
+        
when(directory.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+
+        FileStore store = mock(FileStore.class);
+        
when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 
5).toBytes()); // 100GiB disk
+
+        Multimap<FileStore, Directories.DataDirectory> directories = 
HashMultimap.create();
+        directories.put(store, directory);
+        DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> 
directories));
+
+        doCallRealMethod().when(monitor).getDiskUsage();
+        doReturn(0L).when(monitor).getAllMemtableSize();
+
+        guardrails().setDataDiskUsageMaxDiskSize(null);
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.05);
+
+        // 5G are used of 10G
+        guardrails().setDataDiskUsageMaxDiskSize("10GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+        // max disk size = space used
+        guardrails().setDataDiskUsageMaxDiskSize("5GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+        // max disk size < space used
+        guardrails().setDataDiskUsageMaxDiskSize("1GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(5.0);
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSizeAndSmallUnits() throws 
IOException
+    {
+        // 5GiB used out of 100GiB disk
+        long freeDiskSizeInBytes = DataStorageSpec.inGibibytes(100).toBytes() 
- DataStorageSpec.inMebibytes(5).toBytes();
+
+        FileStore store = mock(FileStore.class);
+        
when(store.getUsableSpace()).thenReturn(DataStorageSpec.inBytes(freeDiskSizeInBytes).toBytes());
 // 100GiB disk
+
+        Directories.DataDirectory directory = 
mock(Directories.DataDirectory.class);
+        
when(directory.getRawSize()).thenReturn(DataStorageSpec.inMebibytes(5).toBytes());
+
+        Multimap<FileStore, Directories.DataDirectory> directories = 
HashMultimap.create();
+        directories.put(store, directory);
+        DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> 
directories));
+
+        doCallRealMethod().when(monitor).getDiskUsage();
+        doReturn(0L).when(monitor).getAllMemtableSize();
+
+        guardrails().setDataDiskUsageMaxDiskSize(null);
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.00005);
+
+        // 5MiB are used of 10MiB
+        guardrails().setDataDiskUsageMaxDiskSize("10MiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+        // max disk size = space used
+        guardrails().setDataDiskUsageMaxDiskSize("5MiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+        // max disk size < space used
+        guardrails().setDataDiskUsageMaxDiskSize("1MiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(5.0);
+    }
+
+    @Test
+    public void testDiskUsageCalculationWithMaxDiskSizeAndMultipleVolumes() 
throws IOException
+    {
+        Mockito.reset();
+
+        Multimap<FileStore, Directories.DataDirectory> directories = 
HashMultimap.create();
+
+        Directories.DataDirectory directory1 = 
mock(Directories.DataDirectory.class);
+        FileStore store1 = mock(FileStore.class);
+        
when(directory1.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+        
when(store1.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 
5).toBytes()); // 100 GiB disk
+        directories.put(store1, directory1);
+
+        Directories.DataDirectory directory2 = 
mock(Directories.DataDirectory.class);
+        FileStore store2 = mock(FileStore.class);
+        
when(directory2.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(25).toBytes());
+        
when(store2.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 
25).toBytes()); // 100 GiB disk
+        directories.put(store2, directory2);
+
+        Directories.DataDirectory directory3 = 
mock(Directories.DataDirectory.class);
+        FileStore store3 = mock(FileStore.class);
+        
when(directory3.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(20).toBytes());
+        
when(store3.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 
20).toBytes()); // 100 GiB disk
+        directories.put(store3, directory3);
+
+        DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> 
directories));
+
+        doCallRealMethod().when(monitor).getDiskUsage();
+        doReturn(0L).when(monitor).getAllMemtableSize();
+
+        // 50G/300G as each disk has a capacity of 100G
+        guardrails().setDataDiskUsageMaxDiskSize(null);
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.16667);
+
+        // 50G/100G
+        guardrails().setDataDiskUsageMaxDiskSize("100GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+        // 50G/75G
+        guardrails().setDataDiskUsageMaxDiskSize("75GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.66667);
+
+        // 50G/50G
+        guardrails().setDataDiskUsageMaxDiskSize("50GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+        // 50G/49G
+        guardrails().setDataDiskUsageMaxDiskSize("49GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.02041);
+    }
+
+    @Test
+    public void 
testDiskUsageCalculationWithMaxDiskSizeAndMultipleDirectories() throws 
IOException
+    {
+        Mockito.reset();
+
+        Directories.DataDirectory directory1 = 
mock(Directories.DataDirectory.class);
+        
when(directory1.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes());
+
+        Directories.DataDirectory directory2 = 
mock(Directories.DataDirectory.class);
+        
when(directory2.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(25).toBytes());
+
+        Directories.DataDirectory directory3 = 
mock(Directories.DataDirectory.class);
+        
when(directory3.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(20).toBytes());
+
+        FileStore store = mock(FileStore.class);
+        
when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(300 - 5 - 
25 - 20).toBytes()); // 100 GiB disk
+
+        Multimap<FileStore, Directories.DataDirectory> directories = 
HashMultimap.create();
+        directories.putAll(store, ImmutableSet.of(directory1, directory2, 
directory3));
+
+        DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> 
directories));
+
+        doCallRealMethod().when(monitor).getDiskUsage();
+        doReturn(0L).when(monitor).getAllMemtableSize();
+
+        // 50G/300G as disk has a capacity of 300G
+        guardrails().setDataDiskUsageMaxDiskSize(null);
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.16667);
+
+        // 50G/100G
+        guardrails().setDataDiskUsageMaxDiskSize("100GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.5);
+
+        // 50G/75G
+        guardrails().setDataDiskUsageMaxDiskSize("75GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(0.66667);
+
+        // 50G/50G
+        guardrails().setDataDiskUsageMaxDiskSize("50GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.0);
+
+        // 50G/49G
+        guardrails().setDataDiskUsageMaxDiskSize("49GiB");
+        assertThat(monitor.getDiskUsage()).isEqualTo(1.02041);
+    }
+
+    @Test
+    public void testWriteRequests() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)");
+
+        InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
+        InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.11");
+        InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.21");
+        InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.31");
+
+        Guardrails.replicaDiskUsage.resetLastNotifyTime();
+        guardrails().setDataDiskUsagePercentageThreshold(98, 99);
+
+        ConsistencyLevel cl = ConsistencyLevel.LOCAL_QUORUM;
+        String select = "SELECT * FROM %s";
+        String insert = "INSERT INTO %s (k, v) VALUES (0, 0)";
+        String batch = "BEGIN BATCH " +
+                       "INSERT INTO %s (k, v) VALUES (1, 1);" +
+                       "INSERT INTO %<s (k, v) VALUES (2, 2); " +
+                       "APPLY BATCH";
+        CheckedFunction userSelect = () -> execute(userClientState, select, 
cl);
+        CheckedFunction userInsert = () -> execute(userClientState, insert, 
cl);
+        CheckedFunction userBatch = () -> execute(userClientState, batch, cl);
+
+        // default state, write request works fine
+        assertValid(userSelect);
+        assertValid(userInsert);
+        assertValid(userBatch);
+
+        // verify node1 NOT_AVAILABLE won't affect writes
+        setDiskUsageState(node1, NOT_AVAILABLE);
+        assertValid(userSelect);
+        assertValid(userInsert);
+        assertValid(userBatch);
+
+        // verify node2 Spacious won't affect writes
+        setDiskUsageState(node2, SPACIOUS);
+        assertValid(userSelect);
+        assertValid(userInsert);
+        assertValid(userBatch);
+
+        // verify node3 STUFFED won't trigger warning as it's not write replica
+        setDiskUsageState(node3, STUFFED);
+        assertValid(userSelect);
+        assertValid(userInsert);
+        assertValid(userBatch);
+
+        // verify node3 Full won't affect writes as it's not write replica
+        setDiskUsageState(node3, FULL);
+        assertValid(userSelect);
+        assertValid(userInsert);
+        assertValid(userBatch);
+
+        // verify local node STUFF, will log warning
+        setDiskUsageState(local, STUFFED);
+        assertValid(userSelect);
+        assertWarns(userInsert);
+        assertWarns(userBatch);
+
+        // verify local node Full, will reject writes
+        setDiskUsageState(local, FULL);
+        assertValid(userSelect);
+        assertFails(userInsert);
+        assertFails(userBatch);
+
+        // excluded users can write to FULL cluster
+        useSuperUser();
+        Guardrails.replicaDiskUsage.resetLastNotifyTime();
+        for (ClientState excludedUser : Arrays.asList(systemClientState, 
superClientState))
+        {
+            assertValid(() -> execute(excludedUser, select, cl));
+            assertValid(() -> execute(excludedUser, insert, cl));
+            assertValid(() -> execute(excludedUser, batch, cl));
+        }
+
+        // verify local node STUFFED won't reject writes
+        setDiskUsageState(local, STUFFED);
+        assertValid(userSelect);
+        assertWarns(userInsert);
+        assertWarns(userBatch);
+    }
+
+    @Override
+    protected void assertValid(CheckedFunction function) throws Throwable
+    {
+        Guardrails.replicaDiskUsage.resetLastNotifyTime();
+        super.assertValid(function);
+    }
+
+    protected void assertWarns(CheckedFunction function) throws Throwable
+    {
+        Guardrails.replicaDiskUsage.resetLastNotifyTime();
+        super.assertWarns(function, "Replica disk usage exceeds warning 
threshold");
+    }
+
+    protected void assertFails(CheckedFunction function) throws Throwable
+    {
+        Guardrails.replicaDiskUsage.resetLastNotifyTime();
+        super.assertFails(function, "Write request failed because disk usage 
exceeds failure threshold");
+    }
+
+    private static void setDiskUsageState(InetAddressAndPort endpoint, 
DiskUsageState state)
+    {
+        DiskUsageBroadcaster.instance.onChange(endpoint, 
ApplicationState.DISK_USAGE, value(state));
+    }
+
+    private static VersionedValue value(DiskUsageState state)
+    {
+        return StorageService.instance.valueFactory.diskUsage(state.name());
+    }
+
+    private void assertMonitorStateTransition(double usageRatio, 
DiskUsageState state, DiskUsageMonitor monitor)
+    {
+        assertMonitorStateTransition(usageRatio, state, monitor, false, null);
+    }
+
+    private void assertMonitorStateTransition(double usageRatio, 
DiskUsageState state, DiskUsageMonitor monitor,
+                                              boolean isWarn, String msg)
+    {
+        boolean stateChanged = state != monitor.state();
+        Consumer<DiskUsageState> notifier = newState -> {
+            if (stateChanged)
+                assertEquals(state, newState);
+            else
+                fail("Expect no notification if state remains the same");
+        };
+
+        monitor.updateLocalState(usageRatio, notifier);
+        assertEquals(state, monitor.state());
+
+        if (msg == null)
+        {
+            listener.assertNotFailed();
+            listener.assertNotWarned();
+        }
+        else if (isWarn)
+        {
+            listener.assertWarned(msg);
+            listener.assertNotFailed();
+        }
+        else
+        {
+            listener.assertFailed(msg);
+            listener.assertNotWarned();
+        }
+
+        listener.clear();
+    }
+}
diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java
index 74747bbe5a..986e1d698f 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java
@@ -427,6 +427,11 @@ public abstract class GuardrailTester extends CQLTester
                          .collect(Collectors.toList());
     }
 
+    protected void assertConfigValid(Consumer<Guardrails> consumer)
+    {
+        consumer.accept(guardrails());
+    }
+
     protected void assertConfigFails(Consumer<Guardrails> consumer, String 
message)
     {
         try
@@ -533,6 +538,11 @@ public abstract class GuardrailTester extends CQLTester
             assertTrue(format("Expect no warning diagnostic events but got 
%s", warnings), warnings.isEmpty());
         }
 
+        public void assertWarned(String message)
+        {
+            assertWarned(Collections.singletonList(message));
+        }
+
         public void assertWarned(List<String> messages)
         {
             assertFalse("Expected to emit warning diagnostic event, but no 
warning was emitted", warnings.isEmpty());
diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java 
b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
index b98659e9a5..b278e7f6f8 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java
@@ -293,6 +293,52 @@ public class GuardrailsTest extends GuardrailTester
         assertValid(() -> disallowed.guard(set(4), action, superClientState));
     }
 
+    @Test
+    public void testPredicates() throws Throwable
+    {
+        Predicates<Integer> guard = new Predicates<>("x",
+                                                     state -> x -> x > 10,
+                                                     state -> x -> x > 100,
+                                                     (isWarn, value) -> 
format("%s: %s", isWarn ? "Warning" : "Aborting", value));
+
+        assertValid(() -> guard.guard(5, userClientState));
+        assertWarns(() -> guard.guard(25, userClientState), "Warning: 25");
+        assertWarns(() -> guard.guard(100,  userClientState), "Warning: 100");
+        assertFails(() -> guard.guard(101,  userClientState), "Aborting: 101");
+        assertFails(() -> guard.guard(200,  userClientState), "Aborting: 200");
+        assertValid(() -> guard.guard(5,  userClientState));
+    }
+
+    @Test
+    public void testPredicatesUsers() throws Throwable
+    {
+        Predicates<Integer> guard = new Predicates<>("x",
+                                                     state -> x -> x > 10,
+                                                     state -> x -> x > 100,
+                                                     (isWarn, value) -> 
format("%s: %s", isWarn ? "Warning" : "Aborting", value));
+
+        assertTrue(guard.enabled());
+        assertTrue(guard.enabled(null));
+        assertTrue(guard.enabled(userClientState));
+        assertFalse(guard.enabled(systemClientState));
+        assertFalse(guard.enabled(superClientState));
+
+        assertValid(() -> guard.guard(5, null));
+        assertValid(() -> guard.guard(5, userClientState));
+        assertValid(() -> guard.guard(5, systemClientState));
+        assertValid(() -> guard.guard(5, superClientState));
+
+        assertWarns(() -> guard.guard(25, null), "Warning: 25");
+        assertWarns(() -> guard.guard(25, userClientState), "Warning: 25");
+        assertValid(() -> guard.guard(25, systemClientState));
+        assertValid(() -> guard.guard(25, superClientState));
+
+        assertFails(() -> guard.guard(101,  null), false, "Aborting: 101");
+        assertFails(() -> guard.guard(101,  userClientState), "Aborting: 101");
+        assertValid(() -> guard.guard(101, systemClientState));
+        assertValid(() -> guard.guard(101, superClientState));
+    }
+
     private static Set<Integer> set(Integer value)
     {
         return Collections.singleton(value);
diff --git a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java 
b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
index 885f626881..a04cc9933b 100644
--- a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
+++ b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java
@@ -21,13 +21,14 @@ package org.apache.cassandra.db.guardrails;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.function.ToIntFunction;
 import java.util.function.ToLongFunction;
 
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DataStorageSpec;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.assertj.core.api.Assertions;
 
@@ -47,7 +48,7 @@ public abstract class ThresholdTester extends GuardrailTester
     private final ToLongFunction<Guardrails> warnGetter;
     private final ToLongFunction<Guardrails> failGetter;
     private final long maxValue;
-    private final long disabledValue;
+    private final Long disabledValue;
 
     protected ThresholdTester(int warnThreshold,
                               int failThreshold,
@@ -63,7 +64,7 @@ public abstract class ThresholdTester extends GuardrailTester
         this.warnGetter = g -> (long) warnGetter.applyAsInt(g);
         this.failGetter = g -> (long) failGetter.applyAsInt(g);
         maxValue = Integer.MAX_VALUE;
-        disabledValue = Config.DISABLED_GUARDRAIL;
+        disabledValue = -1L;
     }
 
     protected ThresholdTester(long warnThreshold,
@@ -80,7 +81,24 @@ public abstract class ThresholdTester extends GuardrailTester
         this.warnGetter = warnGetter;
         this.failGetter = failGetter;
         maxValue = Long.MAX_VALUE;
-        disabledValue = Config.DISABLED_SIZE_GUARDRAIL.toBytes();
+        disabledValue = -1L;
+    }
+
+    protected ThresholdTester(String warnThreshold,
+                              String failThreshold,
+                              Threshold threshold,
+                              TriConsumer<Guardrails, String, String> setter,
+                              Function<Guardrails, String> warnGetter,
+                              Function<Guardrails, String> failGetter)
+    {
+        super(threshold);
+        this.warnThreshold = new DataStorageSpec(warnThreshold).toBytes();
+        this.failThreshold = new DataStorageSpec(failThreshold).toBytes();
+        this.setter = (g, w, a) -> setter.accept(g, w == null ? null : 
DataStorageSpec.inBytes(w).toString(), a == null ? null : 
DataStorageSpec.inBytes(a).toString());
+        this.warnGetter = g -> new 
DataStorageSpec(warnGetter.apply(g)).toBytes();
+        this.failGetter = g -> new 
DataStorageSpec(failGetter.apply(g)).toBytes();
+        maxValue = Long.MAX_VALUE;
+        disabledValue = null;
     }
 
     protected long currentValue()
@@ -225,7 +243,7 @@ public abstract class ThresholdTester extends 
GuardrailTester
         assertInvalidStrictlyPositiveProperty(setter, Integer.MIN_VALUE, name);
         assertInvalidStrictlyPositiveProperty(setter, -2, name);
         assertValidProperty(setter, disabledValue);
-        assertInvalidStrictlyPositiveProperty(setter, disabledValue == 0 ? -1 
: 0, name);
+        assertInvalidStrictlyPositiveProperty(setter, disabledValue == null ? 
-1 : 0, name);
         assertValidProperty(setter, 1L);
         assertValidProperty(setter, 2L);
         assertValidProperty(setter, maxValue);
diff --git a/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java 
b/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
index a2bce7d3d6..3f859dcf99 100644
--- a/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java
@@ -80,7 +80,7 @@ public class GossipInfoTableTest extends CQLTester
             assertThat(entry).isNotEmpty();
 
             UntypedResultSet.Row row = resultSet.one();
-            assertThat(row.getColumns().size()).isEqualTo(62);
+            assertThat(row.getColumns().size()).isEqualTo(64);
 
             InetAddressAndPort endpoint = entry.get().getKey();
             EndpointState localState = entry.get().getValue();
@@ -109,6 +109,7 @@ public class GossipInfoTableTest extends CQLTester
             assertValue(row, "native_address_and_port", localState, 
ApplicationState.NATIVE_ADDRESS_AND_PORT);
             assertValue(row, "status_with_port", localState, 
ApplicationState.STATUS_WITH_PORT);
             assertValue(row, "sstable_versions", localState, 
ApplicationState.SSTABLE_VERSIONS);
+            assertValue(row, "disk_usage", localState, 
ApplicationState.DISK_USAGE);
             assertValue(row, "x_11_padding", localState, 
ApplicationState.X_11_PADDING);
             assertValue(row, "x1", localState, ApplicationState.X1);
             assertValue(row, "x2", localState, ApplicationState.X2);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to