This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new b3842de5cf Add guardrail for data disk usage b3842de5cf is described below commit b3842de5cf1fa1b81872effb4585fbc7e1873d59 Author: Andrés de la Peña <a.penya.gar...@gmail.com> AuthorDate: Fri Apr 22 16:36:07 2022 +0100 Add guardrail for data disk usage patch by Andrés de la Peña; reviewed by Ekaterina Dimitrova and Stefan Miklosovic for CASSANDRA-17150 Co-authored-by: Andrés de la Peña <a.penya.gar...@gmail.com> Co-authored-by: Zhao Yang <jasonstack.z...@gmail.com> Co-authored-by: Eduard Tudenhoefner <etudenhoef...@gmail.com> --- CHANGES.txt | 1 + NEWS.txt | 28 + conf/cassandra.yaml | 24 +- .../config/CassandraRelevantProperties.java | 8 + src/java/org/apache/cassandra/config/Config.java | 49 +- .../apache/cassandra/config/DataStorageSpec.java | 13 +- .../apache/cassandra/config/GuardrailsOptions.java | 121 +++- .../org/apache/cassandra/cql3/QueryOptions.java | 9 +- .../cassandra/cql3/selection/ResultSetBuilder.java | 5 +- .../cassandra/cql3/statements/BatchStatement.java | 8 +- .../cql3/statements/ModificationStatement.java | 25 + src/java/org/apache/cassandra/db/Directories.java | 5 + src/java/org/apache/cassandra/db/ReadCommand.java | 8 +- .../apache/cassandra/db/guardrails/Guardrail.java | 92 ++- .../apache/cassandra/db/guardrails/Guardrails.java | 112 +++- .../cassandra/db/guardrails/GuardrailsConfig.java | 25 +- .../cassandra/db/guardrails/GuardrailsMBean.java | 61 +- .../db/guardrails/PercentageThreshold.java | 56 ++ .../apache/cassandra/db/guardrails/Predicates.java | 93 ++++ .../apache/cassandra/db/guardrails/Threshold.java | 20 +- .../org/apache/cassandra/gms/ApplicationState.java | 1 + .../org/apache/cassandra/gms/VersionedValue.java | 5 + .../cassandra/io/sstable/format/SSTableWriter.java | 2 +- .../apache/cassandra/service/StorageService.java | 2 + .../service/disk/usage/DiskUsageBroadcaster.java | 181 ++++++ .../service/disk/usage/DiskUsageMonitor.java | 233 ++++++++ .../service/disk/usage/DiskUsageState.java | 70 +++ .../test/guardrails/GuardrailDiskUsageTest.java | 225 ++++++++ .../cassandra/config/DataStorageSpecTest.java | 29 +- .../db/guardrails/GuardrailCollectionSizeTest.java | 10 +- .../db/guardrails/GuardrailDiskUsageTest.java | 617 +++++++++++++++++++++ .../cassandra/db/guardrails/GuardrailTester.java | 10 + .../cassandra/db/guardrails/GuardrailsTest.java | 46 ++ .../cassandra/db/guardrails/ThresholdTester.java | 28 +- .../cassandra/db/virtual/GossipInfoTableTest.java | 3 +- 35 files changed, 2125 insertions(+), 100 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index a1213090e2..9e9e1ee2f1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Add guardrail for data disk usage (CASSANDRA-17150) * Tool to list data paths of existing tables (CASSANDRA-17568) * Migrate track_warnings to more standard naming conventions and use latest configuration types rather than long (CASSANDRA-17560) * Add support for CONTAINS and CONTAINS KEY in conditional UPDATE and DELETE statement (CASSANDRA-10537) diff --git a/NEWS.txt b/NEWS.txt index a891eb3a9a..fd31e06c93 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -56,6 +56,34 @@ using the provided 'sstableupgrade' tool. New features ------------ + - Added a new guardrails framework allowing to define soft/hard limits for different user actions, such as limiting + the number of tables, columns per table or the size of collections. These guardrails are only applied to regular + user queries, and superusers and internal queries are excluded. Reaching the soft limit raises a client warning, + whereas reaching the hard limit aborts the query. In both cases a log message and a diagnostic event are emitted. + Additionally, some guardrails are not linked to specific user queries due to techincal limitations, such as + detecting the size of large collections during compaction or periodically monitoring the disk usage. These + guardrails would only emit the proper logs and diagnostic events when triggered, without aborting any processes. + Guardrails config is defined through cassandra.yaml properties, and they can be dynamically updated through the + JMX MBean `org.apache.cassandra.db:type=Guardrails`. There are guardrails for: + - Number of user keyspaces. + - Number of user tables. + - Number of columns per table. + - Number of secondary indexes per table. + - Number of materialized tables per table. + - Number of fields per user-defined type. + - Number of items in a collection . + - Number of partition keys selected by an IN restriction. + - Number of partition keys selected by the cartesian product of multiple IN restrictions. + - Allowed table properties. + - Allowed read consistency levels. + - Allowed write consistency levels. + - Collections size. + - Query page size. + - Data disk usage, defined either as a percentage or as an absolute size. + - Whether user-defined timestamps are allowed. + - Whether GROUP BY queries are allowed. + - Whether the creation of secondary indexes is allowed. + - Whether the creation of uncompressed tables is allowed. - Add support for the use of pure monotonic functions on the last attribute of the GROUP BY clause. - Add floor functions that can be use to group by time range. - Support for native transport rate limiting via native_transport_rate_limiting_enabled and diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index c455d45950..b28f4388f5 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -1653,9 +1653,9 @@ drop_compact_storage_enabled: false # of non-frozen collections there could be unaccounted parts of the collection on the sstables. This is done this way to # prevent read-before-write. The guardrail is also checked at sstable write time to detect large non-frozen collections, # although in that case exceeding the fail threshold will only log an error message, without interrupting the operation. -# The two thresholds default to 0KiB to disable. -# collection_size_warn_threshold: 0KiB -# collection_size_fail_threshold: 0KiB +# The two thresholds default to null to disable. +# collection_size_warn_threshold: +# collection_size_fail_threshold: # Guardrail to warn or fail when encountering more elements in collection than threshold. # At query time this guardrail is applied only to the collection fragment that is being writen, even though in the case # of non-frozen collections there could be unaccounted parts of the collection on the sstables. This is done this way to @@ -1668,6 +1668,24 @@ drop_compact_storage_enabled: false # Default -1 to disable. # fields_per_udt_warn_threshold: -1 # fields_per_udt_fail_threshold: -1 +# Guardrail to warn or fail when local data disk usage percentage exceeds threshold. Valid values are in [1, 100]. +# This is only used for the disks storing data directories, so it won't count any separate disks used for storing +# the commitlog, hints nor saved caches. The disk usage is the ratio between the amount of space used by the data +# directories and the addition of that same space and the remaining free space on disk. The main purpose of this +# guardrail is rejecting user writes when the disks are over the defined usage percentage, so the writes done by +# background processes such as compaction and streaming don't fail due to a full disk. The limits should be defined +# accordingly to the expected data growth due to those background processes, so for example a compaction strategy +# doubling the size of the data would require to keep the disk usage under 50%. +# The two thresholds default to -1 to disable. +# data_disk_usage_percentage_warn_threshold: -1 +# data_disk_usage_percentage_fail_threshold: -1 +# Allows defining the max disk size of the data directories when calculating thresholds for +# disk_usage_percentage_warn_threshold and disk_usage_percentage_fail_threshold, so if this is greater than zero they +# become percentages of a fixed size on disk instead of percentages of the physically available disk size. This should +# be useful when we have a large disk and we only want to use a part of it for Cassandra's data directories. +# Valid values are in [1, max available disk size of all data directories]. +# Defaults to null to disable and use the physically available disk size of data directories during calculations. +# data_disk_usage_max_disk_size: # Startup Checks are executed as part of Cassandra startup process, not all of them # are configurable (so you can disable them) but these which are enumerated bellow. diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 3b63cf8d03..72da3307dc 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -18,6 +18,8 @@ package org.apache.cassandra.config; +import java.util.concurrent.TimeUnit; + import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.service.FileSystemOwnershipCheck; @@ -265,6 +267,12 @@ public enum CassandraRelevantProperties TEST_IGNORE_SIGAR("cassandra.test.ignore_sigar", "false"), PAXOS_EXECUTE_ON_SELF("cassandra.paxos.use_self_execution", "true"), + /** property for the rate of the scheduled task that monitors disk usage */ + DISK_USAGE_MONITOR_INTERVAL_MS("cassandra.disk_usage.monitor_interval_ms", Long.toString(TimeUnit.SECONDS.toMillis(30))), + + /** property for the interval on which the repeated client warnings and diagnostic events about disk usage are ignored */ + DISK_USAGE_NOTIFY_INTERVAL_MS("cassandra.disk_usage.notify_interval_ms", Long.toString(TimeUnit.MINUTES.toMillis(30))), + // for specific tests ORG_APACHE_CASSANDRA_CONF_CASSANDRA_RELEVANT_PROPERTIES_TEST("org.apache.cassandra.conf.CassandraRelevantPropertiesTest"), ORG_APACHE_CASSANDRA_DB_VIRTUAL_SYSTEM_PROPERTIES_TABLE_TEST("org.apache.cassandra.db.virtual.SystemPropertiesTableTest"), diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 58cfbfddfd..7f6c926d08 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -783,24 +783,22 @@ public class Config public volatile SubnetGroups client_error_reporting_exclusions = new SubnetGroups(); public volatile SubnetGroups internode_error_reporting_exclusions = new SubnetGroups(); - public static final int DISABLED_GUARDRAIL = -1; - public static final DataStorageSpec DISABLED_SIZE_GUARDRAIL = DataStorageSpec.inBytes(0); - public volatile int keyspaces_warn_threshold = DISABLED_GUARDRAIL; - public volatile int keyspaces_fail_threshold = DISABLED_GUARDRAIL; - public volatile int tables_warn_threshold = DISABLED_GUARDRAIL; - public volatile int tables_fail_threshold = DISABLED_GUARDRAIL; - public volatile int columns_per_table_warn_threshold = DISABLED_GUARDRAIL; - public volatile int columns_per_table_fail_threshold = DISABLED_GUARDRAIL; - public volatile int secondary_indexes_per_table_warn_threshold = DISABLED_GUARDRAIL; - public volatile int secondary_indexes_per_table_fail_threshold = DISABLED_GUARDRAIL; - public volatile int materialized_views_per_table_warn_threshold = DISABLED_GUARDRAIL; - public volatile int materialized_views_per_table_fail_threshold = DISABLED_GUARDRAIL; - public volatile int page_size_warn_threshold = DISABLED_GUARDRAIL; - public volatile int page_size_fail_threshold = DISABLED_GUARDRAIL; - public volatile int partition_keys_in_select_warn_threshold = DISABLED_GUARDRAIL; - public volatile int partition_keys_in_select_fail_threshold = DISABLED_GUARDRAIL; - public volatile int in_select_cartesian_product_warn_threshold = DISABLED_GUARDRAIL; - public volatile int in_select_cartesian_product_fail_threshold = DISABLED_GUARDRAIL; + public volatile int keyspaces_warn_threshold = -1; + public volatile int keyspaces_fail_threshold = -1; + public volatile int tables_warn_threshold = -1; + public volatile int tables_fail_threshold = -1; + public volatile int columns_per_table_warn_threshold = -1; + public volatile int columns_per_table_fail_threshold = -1; + public volatile int secondary_indexes_per_table_warn_threshold = -1; + public volatile int secondary_indexes_per_table_fail_threshold = -1; + public volatile int materialized_views_per_table_warn_threshold = -1; + public volatile int materialized_views_per_table_fail_threshold = -1; + public volatile int page_size_warn_threshold = -1; + public volatile int page_size_fail_threshold = -1; + public volatile int partition_keys_in_select_warn_threshold = -1; + public volatile int partition_keys_in_select_fail_threshold = -1; + public volatile int in_select_cartesian_product_warn_threshold = -1; + public volatile int in_select_cartesian_product_fail_threshold = -1; public volatile Set<String> table_properties_warned = Collections.emptySet(); public volatile Set<String> table_properties_ignored = Collections.emptySet(); public volatile Set<String> table_properties_disallowed = Collections.emptySet(); @@ -814,12 +812,15 @@ public class Config public volatile boolean uncompressed_tables_enabled = true; public volatile boolean compact_tables_enabled = true; public volatile boolean read_before_write_list_operations_enabled = true; - public volatile DataStorageSpec collection_size_warn_threshold = DISABLED_SIZE_GUARDRAIL; - public volatile DataStorageSpec collection_size_fail_threshold = DISABLED_SIZE_GUARDRAIL; - public volatile int items_per_collection_warn_threshold = DISABLED_GUARDRAIL; - public volatile int items_per_collection_fail_threshold = DISABLED_GUARDRAIL; - public volatile int fields_per_udt_warn_threshold = DISABLED_GUARDRAIL; - public volatile int fields_per_udt_fail_threshold = DISABLED_GUARDRAIL; + public volatile DataStorageSpec collection_size_warn_threshold = null; + public volatile DataStorageSpec collection_size_fail_threshold = null; + public volatile int items_per_collection_warn_threshold = -1; + public volatile int items_per_collection_fail_threshold = -1; + public volatile int fields_per_udt_warn_threshold = -1; + public volatile int fields_per_udt_fail_threshold = -1; + public volatile int data_disk_usage_percentage_warn_threshold = -1; + public volatile int data_disk_usage_percentage_fail_threshold = -1; + public volatile DataStorageSpec data_disk_usage_max_disk_size = null; public volatile DurationSpec streaming_state_expires = DurationSpec.inDays(3); public volatile DataStorageSpec streaming_state_size = DataStorageSpec.inMebibytes(40); diff --git a/src/java/org/apache/cassandra/config/DataStorageSpec.java b/src/java/org/apache/cassandra/config/DataStorageSpec.java index eeafe2ed8d..93224b3245 100644 --- a/src/java/org/apache/cassandra/config/DataStorageSpec.java +++ b/src/java/org/apache/cassandra/config/DataStorageSpec.java @@ -134,6 +134,17 @@ public class DataStorageSpec return new DataStorageSpec(mebibytes, MEBIBYTES); } + /** + * Creates a {@code DataStorageSpec} of the specified amount of gibibytes. + * + * @param gibibytes the amount of gibibytes + * @return a {@code DataStorageSpec} + */ + public static DataStorageSpec inGibibytes(long gibibytes) + { + return new DataStorageSpec(gibibytes, GIBIBYTES); + } + /** * @return the data storage unit. */ @@ -421,4 +432,4 @@ public class DataStorageSpec throw new AbstractMethodError(); } } -} \ No newline at end of file +} diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java b/src/java/org/apache/cassandra/config/GuardrailsOptions.java index b00075ab21..b73cc195bf 100644 --- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java +++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java @@ -18,12 +18,16 @@ package org.apache.cassandra.config; +import java.math.BigInteger; import java.util.Collections; import java.util.Set; import java.util.function.Consumer; import java.util.function.Supplier; +import javax.annotation.Nullable; + import com.google.common.collect.Sets; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +35,7 @@ import org.apache.cassandra.cql3.statements.schema.TableAttributes; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.guardrails.Guardrails; import org.apache.cassandra.db.guardrails.GuardrailsConfig; +import org.apache.cassandra.service.disk.usage.DiskUsageMonitor; import static java.lang.String.format; import static java.util.stream.Collectors.toSet; @@ -74,9 +79,11 @@ public class GuardrailsOptions implements GuardrailsConfig config.read_consistency_levels_disallowed = validateConsistencyLevels(config.read_consistency_levels_disallowed, "read_consistency_levels_disallowed"); config.write_consistency_levels_warned = validateConsistencyLevels(config.write_consistency_levels_warned, "write_consistency_levels_warned"); config.write_consistency_levels_disallowed = validateConsistencyLevels(config.write_consistency_levels_disallowed, "write_consistency_levels_disallowed"); - validateSizeThreshold(config.collection_size_warn_threshold, config.collection_size_fail_threshold, "collection_size"); + validateSizeThreshold(config.collection_size_warn_threshold, config.collection_size_fail_threshold, false, "collection_size"); validateIntThreshold(config.items_per_collection_warn_threshold, config.items_per_collection_fail_threshold, "items_per_collection"); validateIntThreshold(config.fields_per_udt_warn_threshold, config.fields_per_udt_fail_threshold, "fields_per_udt"); + validatePercentageThreshold(config.data_disk_usage_percentage_warn_threshold, config.data_disk_usage_percentage_fail_threshold, "data_disk_usage_percentage"); + validateDataDiskUsageMaxDiskSize(config.data_disk_usage_max_disk_size); } @Override @@ -460,20 +467,23 @@ public class GuardrailsOptions implements GuardrailsConfig x -> config.write_consistency_levels_disallowed = x); } + @Override + @Nullable public DataStorageSpec getCollectionSizeWarnThreshold() { return config.collection_size_warn_threshold; } @Override + @Nullable public DataStorageSpec getCollectionSizeFailThreshold() { return config.collection_size_fail_threshold; } - public void setCollectionSizeThreshold(DataStorageSpec warn, DataStorageSpec fail) + public void setCollectionSizeThreshold(@Nullable DataStorageSpec warn, @Nullable DataStorageSpec fail) { - validateSizeThreshold(warn, fail, "collection_size"); + validateSizeThreshold(warn, fail, false, "collection_size"); updatePropertyWithLogging("collection_size_warn_threshold", warn, () -> config.collection_size_warn_threshold, @@ -534,10 +544,49 @@ public class GuardrailsOptions implements GuardrailsConfig x -> config.fields_per_udt_fail_threshold = x); } + public int getDataDiskUsagePercentageWarnThreshold() + { + return config.data_disk_usage_percentage_warn_threshold; + } + + @Override + public int getDataDiskUsagePercentageFailThreshold() + { + return config.data_disk_usage_percentage_fail_threshold; + } + + public void setDataDiskUsagePercentageThreshold(int warn, int fail) + { + validatePercentageThreshold(warn, fail, "data_disk_usage_percentage"); + updatePropertyWithLogging("data_disk_usage_percentage_warn_threshold", + warn, + () -> config.data_disk_usage_percentage_warn_threshold, + x -> config.data_disk_usage_percentage_warn_threshold = x); + updatePropertyWithLogging("data_disk_usage_percentage_fail_threshold", + fail, + () -> config.data_disk_usage_percentage_fail_threshold, + x -> config.data_disk_usage_percentage_fail_threshold = x); + } + + @Override + public DataStorageSpec getDataDiskUsageMaxDiskSize() + { + return config.data_disk_usage_max_disk_size; + } + + public void setDataDiskUsageMaxDiskSize(@Nullable DataStorageSpec diskSize) + { + validateDataDiskUsageMaxDiskSize(diskSize); + updatePropertyWithLogging("data_disk_usage_max_disk_size", + diskSize, + () -> config.data_disk_usage_max_disk_size, + x -> config.data_disk_usage_max_disk_size = x); + } + private static <T> void updatePropertyWithLogging(String propertyName, T newValue, Supplier<T> getter, Consumer<T> setter) { T oldValue = getter.get(); - if (!newValue.equals(oldValue)) + if (newValue == null || !newValue.equals(oldValue)) { setter.accept(newValue); logger.info("Updated {} from {} to {}", propertyName, oldValue, newValue); @@ -546,7 +595,7 @@ public class GuardrailsOptions implements GuardrailsConfig private static void validatePositiveNumeric(long value, long maxValue, String name) { - if (value == Config.DISABLED_GUARDRAIL) + if (value == -1) return; if (value > maxValue) @@ -555,14 +604,17 @@ public class GuardrailsOptions implements GuardrailsConfig if (value == 0) throw new IllegalArgumentException(format("Invalid value for %s: 0 is not allowed; " + - "if attempting to disable use %d", - name, Config.DISABLED_GUARDRAIL)); + "if attempting to disable use -1", name)); // We allow -1 as a general "disabling" flag. But reject anything lower to avoid mistakes. if (value <= 0) throw new IllegalArgumentException(format("Invalid value %d for %s: negative values are not allowed, " + - "outside of %d which disables the guardrail", - value, name, Config.DISABLED_GUARDRAIL)); + "outside of -1 which disables the guardrail", value, name)); + } + + private static void validatePercentage(long value, String name) + { + validatePositiveNumeric(value, 100, name); } private static void validateIntThreshold(int warn, int fail, String name) @@ -572,9 +624,16 @@ public class GuardrailsOptions implements GuardrailsConfig validateWarnLowerThanFail(warn, fail, name); } + private static void validatePercentageThreshold(int warn, int fail, String name) + { + validatePercentage(warn, name + "_warn_threshold"); + validatePercentage(fail, name + "_fail_threshold"); + validateWarnLowerThanFail(warn, fail, name); + } + private static void validateWarnLowerThanFail(long warn, long fail, String name) { - if (warn == Config.DISABLED_GUARDRAIL || fail == Config.DISABLED_GUARDRAIL) + if (warn == -1 || fail == -1) return; if (fail < warn) @@ -582,9 +641,27 @@ public class GuardrailsOptions implements GuardrailsConfig "than the fail threshold %d", warn, name, fail)); } - private static void validateSizeThreshold(DataStorageSpec warn, DataStorageSpec fail, String name) + private static void validateSize(DataStorageSpec size, boolean allowZero, String name) + { + if (size == null) + return; + + if (!allowZero && size.toBytes() == 0) + throw new IllegalArgumentException(format("Invalid value for %s: 0 is not allowed; " + + "if attempting to disable use an empty value", + name)); + } + + private static void validateSizeThreshold(DataStorageSpec warn, DataStorageSpec fail, boolean allowZero, String name) { - if (warn.equals(Config.DISABLED_SIZE_GUARDRAIL) || fail.equals(Config.DISABLED_SIZE_GUARDRAIL)) + validateSize(warn, allowZero, name + "_warn_threshold"); + validateSize(fail, allowZero, name + "_fail_threshold"); + validateWarnLowerThanFail(warn, fail, name); + } + + private static void validateWarnLowerThanFail(DataStorageSpec warn, DataStorageSpec fail, String name) + { + if (warn == null || fail == null) return; if (fail.toBytes() < warn.toBytes()) @@ -615,4 +692,24 @@ public class GuardrailsOptions implements GuardrailsConfig return consistencyLevels.isEmpty() ? Collections.emptySet() : Sets.immutableEnumSet(consistencyLevels); } + + private static void validateDataDiskUsageMaxDiskSize(DataStorageSpec maxDiskSize) + { + if (maxDiskSize == null) + return; + + validateSize(maxDiskSize, false, "data_disk_usage_max_disk_size"); + + BigInteger diskSize = DiskUsageMonitor.dataDirectoriesGroupedByFileStore() + .keys() + .stream() + .map(DiskUsageMonitor::totalSpace) + .map(BigInteger::valueOf) + .reduce(BigInteger.ZERO, BigInteger::add); + + if (diskSize.compareTo(BigInteger.valueOf(maxDiskSize.toBytes())) < 0) + throw new IllegalArgumentException(format("Invalid value for data_disk_usage_max_disk_size: " + + "%s specified, but only %s are actually available on disk", + maxDiskSize, DataStorageSpec.inBytes(diskSize.longValue()))); + } } diff --git a/src/java/org/apache/cassandra/cql3/QueryOptions.java b/src/java/org/apache/cassandra/cql3/QueryOptions.java index 4193c4e15a..5fcaf06786 100644 --- a/src/java/org/apache/cassandra/cql3/QueryOptions.java +++ b/src/java/org/apache/cassandra/cql3/QueryOptions.java @@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList; import io.netty.buffer.ByteBuf; -import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.schema.ColumnMetadata; @@ -282,13 +281,13 @@ public abstract class QueryOptions @Override public long getCoordinatorReadSizeWarnThresholdBytes() { - return Config.DISABLED_GUARDRAIL; + return -1; } @Override public long getCoordinatorReadSizeFailThresholdBytes() { - return Config.DISABLED_GUARDRAIL; + return -1; } } @@ -299,8 +298,8 @@ public abstract class QueryOptions public DefaultReadThresholds(DataStorageSpec warnThreshold, DataStorageSpec abortThreshold) { - this.warnThresholdBytes = warnThreshold == null ? Config.DISABLED_GUARDRAIL : warnThreshold.toBytes(); - this.abortThresholdBytes = abortThreshold == null ? Config.DISABLED_GUARDRAIL : abortThreshold.toBytes(); + this.warnThresholdBytes = warnThreshold == null ? -1 : warnThreshold.toBytes(); + this.abortThresholdBytes = abortThreshold == null ? -1 : abortThreshold.toBytes(); } @Override diff --git a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java index b42357835a..22566b26d7 100644 --- a/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java +++ b/src/java/org/apache/cassandra/cql3/selection/ResultSetBuilder.java @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import org.apache.cassandra.config.Config; import org.apache.cassandra.cql3.ResultSet; import org.apache.cassandra.cql3.ResultSet.ResultMetadata; import org.apache.cassandra.cql3.selection.Selection.Selectors; @@ -76,7 +75,7 @@ public final class ResultSetBuilder public boolean shouldWarn(long thresholdBytes) { - if (thresholdBytes != Config.DISABLED_GUARDRAIL &&!sizeWarningEmitted && size > thresholdBytes) + if (thresholdBytes != -1 &&!sizeWarningEmitted && size > thresholdBytes) { sizeWarningEmitted = true; return true; @@ -86,7 +85,7 @@ public final class ResultSetBuilder public boolean shouldReject(long thresholdBytes) { - return thresholdBytes != Config.DISABLED_GUARDRAIL && size > thresholdBytes; + return thresholdBytes != -1 && size > thresholdBytes; } public long getSize() diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 89ece02c4d..61e4934864 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -411,8 +411,12 @@ public class BatchStatement implements CQLStatement if (options.getSerialConsistency() == null) throw new InvalidRequestException("Invalid empty serial consistency level"); + ClientState clientState = queryState.getClientState(); Guardrails.writeConsistencyLevels.guard(EnumSet.of(options.getConsistency(), options.getSerialConsistency()), - queryState.getClientState()); + clientState); + + for (int i = 0; i < statements.size(); i++ ) + statements.get(i).validateDiskUsage(options.forStatement(i), clientState); if (hasConditions) return executeWithConditions(options, queryState, queryStartNanoTime); @@ -420,7 +424,7 @@ public class BatchStatement implements CQLStatement if (updatesVirtualTables) executeInternalWithoutCondition(queryState, options, queryStartNanoTime); else - executeWithoutConditions(getMutations(queryState.getClientState(), options, false, timestamp, nowInSeconds, queryStartNanoTime), + executeWithoutConditions(getMutations(clientState, options, false, timestamp, nowInSeconds, queryStartNanoTime), options.getConsistency(), queryStartNanoTime); return new ResultMessage.Void(); diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 354f739bb1..ab36ec971b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.Permission; import org.apache.cassandra.db.guardrails.Guardrails; import org.apache.cassandra.db.marshal.ValueAccessor; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; @@ -51,6 +54,7 @@ import org.apache.cassandra.exceptions.*; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; import org.apache.cassandra.service.paxos.Ballot; import org.apache.cassandra.service.paxos.BallotGenerator; import org.apache.cassandra.service.paxos.Commit.Proposal; @@ -277,6 +281,25 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa Guardrails.userTimestampsEnabled.ensureEnabled(state); } + public void validateDiskUsage(QueryOptions options, ClientState state) + { + // reject writes if any replica exceeds disk usage failure limit or warn if it exceeds warn limit + if (Guardrails.replicaDiskUsage.enabled(state) && DiskUsageBroadcaster.instance.hasStuffedOrFullNode()) + { + Keyspace keyspace = Keyspace.open(keyspace()); + + for (ByteBuffer key : buildPartitionKeyNames(options, state)) + { + Token token = metadata().partitioner.getToken(key); + + for (Replica replica : ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token).all()) + { + Guardrails.replicaDiskUsage.guard(replica.endpoint(), state); + } + } + } + } + public RegularAndStaticColumns updatedColumns() { return updatedColumns; @@ -480,6 +503,8 @@ public abstract class ModificationStatement implements CQLStatement.SingleKeyspa else cl.validateForWrite(); + validateDiskUsage(options, queryState.getClientState()); + List<? extends IMutation> mutations = getMutations(queryState.getClientState(), options, diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java index bcef41bc7d..972ba6d3cd 100644 --- a/src/java/org/apache/cassandra/db/Directories.java +++ b/src/java/org/apache/cassandra/db/Directories.java @@ -640,6 +640,11 @@ public class Directories return availableSpace > 0 ? availableSpace : 0; } + public long getRawSize() + { + return FileUtils.folderSize(location); + } + @Override public boolean equals(Object o) { diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 1efb086b40..ef70588a45 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -666,8 +666,8 @@ public abstract class ReadCommand extends AbstractReadQuery DataStorageSpec failThreshold = DatabaseDescriptor.getLocalReadSizeFailThreshold(); if (!shouldTrackSize(warnThreshold, failThreshold)) return iterator; - final long warnBytes = warnThreshold == null ? Config.DISABLED_GUARDRAIL : warnThreshold.toBytes(); - final long failBytes = failThreshold == null ? Config.DISABLED_GUARDRAIL : failThreshold.toBytes(); + final long warnBytes = warnThreshold == null ? -1 : warnThreshold.toBytes(); + final long failBytes = failThreshold == null ? -1 : failThreshold.toBytes(); class QuerySizeTracking extends Transformation<UnfilteredRowIterator> { private long sizeInBytes = 0; @@ -709,7 +709,7 @@ public abstract class ReadCommand extends AbstractReadQuery private void addSize(long size) { this.sizeInBytes += size; - if (failBytes != Config.DISABLED_GUARDRAIL && this.sizeInBytes >= failBytes) + if (failBytes != -1 && this.sizeInBytes >= failBytes) { String msg = String.format("Query %s attempted to read %d bytes but max allowed is %s; query aborted (see local_read_size_fail_threshold)", ReadCommand.this.toCQLString(), this.sizeInBytes, failThreshold); @@ -718,7 +718,7 @@ public abstract class ReadCommand extends AbstractReadQuery MessageParams.add(ParamType.LOCAL_READ_SIZE_FAIL, this.sizeInBytes); throw new LocalReadSizeTooLargeException(msg); } - else if (warnBytes != Config.DISABLED_GUARDRAIL && this.sizeInBytes >= warnBytes) + else if (warnBytes != -1 && this.sizeInBytes >= warnBytes) { MessageParams.add(ParamType.LOCAL_READ_SIZE_WARN, this.sizeInBytes); } diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java index 6609760109..c058f10784 100644 --- a/src/java/org/apache/cassandra/db/guardrails/Guardrail.java +++ b/src/java/org/apache/cassandra/db/guardrails/Guardrail.java @@ -29,6 +29,7 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.NoSpamLogger; /** @@ -50,11 +51,32 @@ public abstract class Guardrail /** A name identifying the guardrail (mainly for shipping with diagnostic events). */ public final String name; + /** Minimum logging and triggering interval to avoid spamming downstream. */ + private long minNotifyIntervalInMs = 0; + + /** Time of last warning in milliseconds. */ + private volatile long lastWarnInMs = 0; + + /** Time of last failure in milliseconds. */ + private volatile long lastFailInMs = 0; + Guardrail(String name) { this.name = name; } + /** + * Checks whether this guardrail is enabled or not when the check is done for a background opperation that is not + * associated to a specific {@link ClientState}, such as compaction or other background processes. Operations that + * are associated to a {@link ClientState}, such as CQL queries, should use {@link Guardrail#enabled(ClientState)}. + * + * @return {@code true} if this guardrail is enabled, {@code false} otherwise. + */ + public boolean enabled() + { + return enabled(null); + } + /** * Checks whether this guardrail is enabled or not. This will be enabled if the database is initialized and the * authenticated user (if specified) is not system nor superuser. @@ -75,6 +97,9 @@ public abstract class Guardrail protected void warn(String message, String redactedMessage) { + if (skipNotifying(true)) + return; + message = decorateMessage(message); logger.warn(message); @@ -95,13 +120,16 @@ public abstract class Guardrail { message = decorateMessage(message); - logger.error(message); - // Note that ClientWarn will simply ignore the message if we're not running this as part of a user query - // (the internal "state" will be null) - ClientWarn.instance.warn(message); - // Similarly, tracing will also ignore the message if we're not running tracing on the current thread. - Tracing.trace(message); - GuardrailsDiagnostics.failed(name, decorateMessage(redactedMessage)); + if (!skipNotifying(false)) + { + logger.error(message); + // Note that ClientWarn will simply ignore the message if we're not running this as part of a user query + // (the internal "state" will be null) + ClientWarn.instance.warn(message); + // Similarly, tracing will also ignore the message if we're not running tracing on the current thread. + Tracing.trace(message); + GuardrailsDiagnostics.failed(name, decorateMessage(redactedMessage)); + } if (state != null) throw new GuardrailViolatedException(message); @@ -113,4 +141,54 @@ public abstract class Guardrail // Add a prefix to error message so user knows what threw the warning or cause the failure return String.format("Guardrail %s violated: %s", name, message); } + + /** + * Note: this method is not thread safe and should only be used during guardrail initialization + * + * @param minNotifyIntervalInMs frequency of logging and triggering listener to avoid spamming, + * default 0 means always log and trigger listeners. + * @return current guardrail + */ + Guardrail minNotifyIntervalInMs(long minNotifyIntervalInMs) + { + assert minNotifyIntervalInMs >= 0; + this.minNotifyIntervalInMs = minNotifyIntervalInMs; + return this; + } + + /** + * reset last notify time to make sure it will notify downstream when {@link this#warn(String, String)} + * or {@link this#fail(String, ClientState)} is called next time. + */ + @VisibleForTesting + void resetLastNotifyTime() + { + lastFailInMs = 0; + lastWarnInMs = 0; + } + + /** + * @return true if guardrail should not log message and trigger listeners; otherwise, update lastWarnInMs or + * lastFailInMs respectively. + */ + private boolean skipNotifying(boolean isWarn) + { + if (minNotifyIntervalInMs == 0) + return false; + + long nowInMs = Clock.Global.currentTimeMillis(); + long timeElapsedInMs = nowInMs - (isWarn ? lastWarnInMs : lastFailInMs); + + boolean skip = timeElapsedInMs < minNotifyIntervalInMs; + + if (!skip) + { + if (isWarn) + lastWarnInMs = nowInMs; + else + lastFailInMs = nowInMs; + } + + return skip; + } } diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java index cc54b279a4..d7ec81e83b 100644 --- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java +++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java @@ -22,15 +22,19 @@ import java.util.Collections; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; +import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.StringUtils; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.GuardrailsOptions; import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; import org.apache.cassandra.utils.MBeanWrapper; import static java.lang.String.format; @@ -42,7 +46,7 @@ public final class Guardrails implements GuardrailsMBean { public static final String MBEAN_NAME = "org.apache.cassandra.db:type=Guardrails"; - private static final GuardrailsConfigProvider CONFIG_PROVIDER = GuardrailsConfigProvider.instance; + public static final GuardrailsConfigProvider CONFIG_PROVIDER = GuardrailsConfigProvider.instance; private static final GuardrailsOptions DEFAULT_CONFIG = DatabaseDescriptor.getGuardrailsConfig(); @VisibleForTesting @@ -201,11 +205,11 @@ public final class Guardrails implements GuardrailsMBean state -> CONFIG_PROVIDER.getOrCreate(state).getInSelectCartesianProductWarnThreshold(), state -> CONFIG_PROVIDER.getOrCreate(state).getInSelectCartesianProductFailThreshold(), (isWarning, what, value, threshold) -> - isWarning ? format("The cartesian product of the IN restrictions on %s produces %d values, " + + isWarning ? format("The cartesian product of the IN restrictions on %s produces %s values, " + "this exceeds warning threshold of %s.", what, value, threshold) : format("Aborting query because the cartesian product of the IN restrictions on %s " + - "produces %d values, this exceeds fail threshold of %s.", + "produces %s values, this exceeds fail threshold of %s.", what, value, threshold)); /** @@ -233,8 +237,8 @@ public final class Guardrails implements GuardrailsMBean */ public static final Threshold collectionSize = new Threshold("collection_size", - state -> CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold().toBytes(), - state -> CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold().toBytes(), + state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeWarnThreshold()), + state -> sizeToBytes(CONFIG_PROVIDER.getOrCreate(state).getCollectionSizeFailThreshold()), (isWarning, what, value, threshold) -> isWarning ? format("Detected collection %s of size %s, this exceeds the warning threshold of %s.", what, value, threshold) @@ -267,6 +271,42 @@ public final class Guardrails implements GuardrailsMBean : format("User types cannot have more than %s columns, but %s provided for user type %s.", threshold, value, what)); + /** + * Guardrail on the data disk usage on the local node, used by a periodic task to calculate and propagate that status. + * See {@link org.apache.cassandra.service.disk.usage.DiskUsageMonitor} and {@link DiskUsageBroadcaster}. + */ + public static final PercentageThreshold localDataDiskUsage = + new PercentageThreshold("local_data_disk_usage", + state -> CONFIG_PROVIDER.getOrCreate(state).getDataDiskUsagePercentageWarnThreshold(), + state -> CONFIG_PROVIDER.getOrCreate(state).getDataDiskUsagePercentageFailThreshold(), + (isWarning, what, value, threshold) -> + isWarning ? format("Local data disk usage %s(%s) exceeds warning threshold of %s", + value, what, threshold) + : format("Local data disk usage %s(%s) exceeds failure threshold of %s, " + + "will stop accepting writes", + value, what, threshold)); + + /** + * Guardrail on the data disk usage on replicas, used at write time to verify the status of the involved replicas. + * See {@link org.apache.cassandra.service.disk.usage.DiskUsageMonitor} and {@link DiskUsageBroadcaster}. + */ + public static final Predicates<InetAddressAndPort> replicaDiskUsage = + new Predicates<>("replica_disk_usage", + state -> DiskUsageBroadcaster.instance::isStuffed, + state -> DiskUsageBroadcaster.instance::isFull, + // not using `value` because it represents replica address which should be hidden from client. + (isWarning, value) -> + isWarning ? "Replica disk usage exceeds warning threshold" + : "Write request failed because disk usage exceeds failure threshold"); + + static + { + // Avoid spamming with notifications about stuffed/full disks + long minNotifyInterval = CassandraRelevantProperties.DISK_USAGE_NOTIFY_INTERVAL_MS.getLong(); + localDataDiskUsage.minNotifyIntervalInMs(minNotifyInterval); + replicaDiskUsage.minNotifyIntervalInMs(minNotifyInterval); + } + private Guardrails() { MBeanWrapper.instance.registerMBean(this, MBEAN_NAME); @@ -557,22 +597,24 @@ public final class Guardrails implements GuardrailsMBean DEFAULT_CONFIG.setPartitionKeysInSelectThreshold(warn, fail); } - public long getCollectionSizeWarnThresholdInKiB() + @Override + @Nullable + public String getCollectionSizeWarnThreshold() { - return DEFAULT_CONFIG.getCollectionSizeWarnThreshold().toKibibytes(); + return sizeToString(DEFAULT_CONFIG.getCollectionSizeWarnThreshold()); } @Override - public long getCollectionSizeFailThresholdInKiB() + @Nullable + public String getCollectionSizeFailThreshold() { - return DEFAULT_CONFIG.getCollectionSizeFailThreshold().toKibibytes(); + return sizeToString(DEFAULT_CONFIG.getCollectionSizeFailThreshold()); } @Override - public void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB) + public void setCollectionSizeThreshold(@Nullable String warnSize, @Nullable String failSize) { - DEFAULT_CONFIG.setCollectionSizeThreshold(DataStorageSpec.inKibibytes(warnInKiB), - DataStorageSpec.inKibibytes(failInKiB)); + DEFAULT_CONFIG.setCollectionSizeThreshold(sizeFromString(warnSize), sizeFromString(failSize)); } @Override @@ -725,6 +767,37 @@ public final class Guardrails implements GuardrailsMBean DEFAULT_CONFIG.setFieldsPerUDTThreshold(warn, fail); } + @Override + public int getDataDiskUsagePercentageWarnThreshold() + { + return DEFAULT_CONFIG.getDataDiskUsagePercentageWarnThreshold(); + } + + @Override + public int getDataDiskUsagePercentageFailThreshold() + { + return DEFAULT_CONFIG.getDataDiskUsagePercentageFailThreshold(); + } + + @Override + public void setDataDiskUsagePercentageThreshold(int warn, int fail) + { + DEFAULT_CONFIG.setDataDiskUsagePercentageThreshold(warn, fail); + } + + @Override + @Nullable + public String getDataDiskUsageMaxDiskSize() + { + return sizeToString(DEFAULT_CONFIG.getDataDiskUsageMaxDiskSize()); + } + + @Override + public void setDataDiskUsageMaxDiskSize(@Nullable String size) + { + DEFAULT_CONFIG.setDataDiskUsageMaxDiskSize(sizeFromString(size)); + } + private static String toCSV(Set<String> values) { return values == null || values.isEmpty() ? "" : String.join(",", values); @@ -758,4 +831,19 @@ public final class Guardrails implements GuardrailsMBean return null; return set.stream().map(ConsistencyLevel::valueOf).collect(Collectors.toSet()); } + + private static Long sizeToBytes(@Nullable DataStorageSpec size) + { + return size == null ? -1 : size.toBytes(); + } + + private static String sizeToString(@Nullable DataStorageSpec size) + { + return size == null ? null : size.toString(); + } + + private static DataStorageSpec sizeFromString(@Nullable String size) + { + return StringUtils.isEmpty(size) ? null : new DataStorageSpec(size); + } } diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java index 08b3d5677b..562509b7dd 100644 --- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java +++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java @@ -20,6 +20,8 @@ package org.apache.cassandra.db.guardrails; import java.util.Set; +import javax.annotation.Nullable; + import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.db.ConsistencyLevel; @@ -200,14 +202,16 @@ public interface GuardrailsConfig */ Set<ConsistencyLevel> getWriteConsistencyLevelsDisallowed(); - /* + /** * @return The threshold to warn when encountering a collection with larger data size than threshold. */ + @Nullable DataStorageSpec getCollectionSizeWarnThreshold(); /** * @return The threshold to prevent collections with larger data size than threshold. */ + @Nullable DataStorageSpec getCollectionSizeFailThreshold(); /** @@ -229,4 +233,23 @@ public interface GuardrailsConfig * @return The threshold to fail when creating a UDT with more fields than threshold. */ int getFieldsPerUDTFailThreshold(); + + /** + * @return The threshold to warn when local disk usage percentage exceeds that threshold. + * Allowed values are in the range {@code [1, 100]}, and -1 means disabled. + */ + int getDataDiskUsagePercentageWarnThreshold(); + + /** + * @return The threshold to fail when local disk usage percentage exceeds that threshold. + * Allowed values are in the range {@code [1, 100]}, and -1 means disabled. + */ + int getDataDiskUsagePercentageFailThreshold(); + + /** + * @return The max disk size of the data directories when calculating disk usage thresholds, {@code null} means + * disabled. + */ + @Nullable + DataStorageSpec getDataDiskUsageMaxDiskSize(); } diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java index 771db67ce6..215b953b89 100644 --- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java +++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java @@ -19,6 +19,7 @@ package org.apache.cassandra.db.guardrails; import java.util.Set; +import javax.annotation.Nullable; /** * JMX entrypoint for updating the default guardrails configuration parsed from {@code cassandra.yaml}. @@ -110,6 +111,7 @@ public interface GuardrailsMBean /** * Enables or disables the ability to create secondary indexes + * * @param enabled */ void setSecondaryIndexesEnabled(boolean enabled); @@ -401,20 +403,30 @@ public interface GuardrailsMBean void setWriteConsistencyLevelsDisallowedCSV(String consistencyLevels); /** - * @return The threshold to warn when encountering larger size of collection data than threshold, in KiB. + * @return The threshold to warn when encountering larger size of collection data than threshold, as a string + * formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. A {@code null} value + * means that the threshold is disabled. */ - long getCollectionSizeWarnThresholdInKiB(); + @Nullable + String getCollectionSizeWarnThreshold(); /** - * @return The threshold to prevent collections with larger data size than threshold, in KiB. + * @return The threshold to prevent collections with larger data size than threshold, as a string formatted as in, + * for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. A {@code null} value means that the + * threshold is disabled. */ - long getCollectionSizeFailThresholdInKiB(); + @Nullable + String getCollectionSizeFailThreshold(); /** - * @param warnInKiB The threshold to warn when encountering larger size of collection data than threshold, in KiB. - * @param failInKiB The threshold to prevent collections with larger data size than threshold, in KiB. + * @param warnSize The threshold to warn when encountering larger size of collection data than threshold, as a + * string formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. + * A {@code null} value means disabled. + * @param failSize The threshold to prevent collections with larger data size than threshold, as a string formatted + * as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. A {@code null} + * value means disabled. */ - void setCollectionSizeThresholdInKiB(long warnInKiB, long failInKiB); + void setCollectionSizeThreshold(@Nullable String warnSize, @Nullable String failSize); /** * @return The threshold to warn when encountering more elements in a collection than threshold. @@ -447,4 +459,39 @@ public interface GuardrailsMBean * @param fail The threshold to prevent creating a UDT with more fields than threshold. -1 means disabled. */ void setFieldsPerUDTThreshold(int warn, int fail); + + /** + * @return The threshold to warn when local data disk usage percentage exceeds that threshold. + * Allowed values are in the range {@code [1, 100]}, and -1 means disabled. + */ + int getDataDiskUsagePercentageWarnThreshold(); + + /** + * @return The threshold to fail when local data disk usage percentage exceeds that threshold. + * Allowed values are in the range {@code [1, 100]}, and -1 means disabled. + */ + int getDataDiskUsagePercentageFailThreshold(); + + /** + * @param warn The threshold to warn when local disk usage percentage exceeds that threshold. + * Allowed values are in the range {@code [1, 100]}, and -1 means disabled. + * @param fail The threshold to fail when local disk usage percentage exceeds that threshold. + * Allowed values are in the range {@code [1, 100]}, and -1 means disabled. + */ + public void setDataDiskUsagePercentageThreshold(int warn, int fail); + + /** + * @return The max disk size of the data directories when calculating disk usage thresholds, as a string formatted + * as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. A {@code null} value means + * disabled. + */ + @Nullable + String getDataDiskUsageMaxDiskSize(); + + /** + * @param size The max disk size of the data directories when calculating disk usage thresholds, as a string + * formatted as in, for example, {@code 10GiB}, {@code 20MiB}, {@code 30KiB} or {@code 40B}. + * A {@code null} value means disabled. + */ + void setDataDiskUsageMaxDiskSize(@Nullable String size); } diff --git a/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java b/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java new file mode 100644 index 0000000000..c08d02641b --- /dev/null +++ b/src/java/org/apache/cassandra/db/guardrails/PercentageThreshold.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.guardrails; + +import java.util.function.ToLongFunction; + +import org.apache.cassandra.service.ClientState; + +/** + * A {@link Threshold} guardrail whose values represent a percentage + * <p> + * This works exactly as a {@link Threshold}, but provides slightly more convenient error messages for percentage + */ +public class PercentageThreshold extends Threshold +{ + /** + * Creates a new threshold guardrail. + * + * @param name the identifying name of the guardrail + * @param warnThreshold a {@link ClientState}-based provider of the value above which a warning should be triggered. + * @param failThreshold a {@link ClientState}-based provider of the value above which the operation should be aborted. + * @param messageProvider a function to generate the warning or error message if the guardrail is triggered + */ + public PercentageThreshold(String name, + ToLongFunction<ClientState> warnThreshold, + ToLongFunction<ClientState> failThreshold, + ErrorMessageProvider messageProvider) + { + super(name, warnThreshold, failThreshold, messageProvider); + } + + @Override + protected String errMsg(boolean isWarning, String what, long value, long thresholdValue) + { + return messageProvider.createMessage(isWarning, + what, + String.format("%d%%", value), + String.format("%d%%", thresholdValue)); + } +} diff --git a/src/java/org/apache/cassandra/db/guardrails/Predicates.java b/src/java/org/apache/cassandra/db/guardrails/Predicates.java new file mode 100644 index 0000000000..13be9e9302 --- /dev/null +++ b/src/java/org/apache/cassandra/db/guardrails/Predicates.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.guardrails; + +import java.util.function.Function; +import java.util.function.Predicate; +import javax.annotation.Nullable; + +import org.apache.cassandra.service.ClientState; + +/** + * A guardrail based on two predicates. + * + * <p>A {@link Predicates} guardrail defines (up to) 2 predicates, one at which a warning is issued, and another one + * at which a failure is triggered. If failure is triggered, warning is skipped. + * + * @param <T> the type of the values to be tested against predicates. + */ +public class Predicates<T> extends Guardrail +{ + private final Function<ClientState, Predicate<T>> warnPredicate; + private final Function<ClientState, Predicate<T>> failurePredicate; + private final MessageProvider<T> messageProvider; + + /** + * A function used to build the warning or error message of a triggered {@link Predicates} guardrail. + */ + public interface MessageProvider<T> + { + /** + * Called when the guardrail is triggered to build the corresponding message. + * + * @param isWarning whether the trigger is a warning one; otherwise it is failure one. + * @param value the value that triggers guardrail. + */ + String createMessage(boolean isWarning, T value); + } + + /** + * Creates a new {@link Predicates} guardrail. + * + * @param name the identifying name of the guardrail + * @param warnPredicate a {@link ClientState}-based predicate provider that is used to check if given value should trigger a warning. + * @param failurePredicate a {@link ClientState}-based predicate provider that is used to check if given value should trigger a failure. + * @param messageProvider a function to generate the warning or error message if the guardrail is triggered + */ + Predicates(String name, + Function<ClientState, Predicate<T>> warnPredicate, + Function<ClientState, Predicate<T>> failurePredicate, + MessageProvider<T> messageProvider) + { + super(name); + this.warnPredicate = warnPredicate; + this.failurePredicate = failurePredicate; + this.messageProvider = messageProvider; + } + + /** + * Apply the guardrail to the provided value, triggering a warning or failure if appropriate. + * + * @param value the value to check. + */ + public void guard(T value, @Nullable ClientState state) + { + if (!enabled(state)) + return; + + if (failurePredicate.apply(state).test(value)) + { + fail(messageProvider.createMessage(false, value), state); + } + else if (warnPredicate.apply(state).test(value)) + { + warn(messageProvider.createMessage(true, value)); + } + } +} diff --git a/src/java/org/apache/cassandra/db/guardrails/Threshold.java b/src/java/org/apache/cassandra/db/guardrails/Threshold.java index f88d1e0cbe..f7e4823448 100644 --- a/src/java/org/apache/cassandra/db/guardrails/Threshold.java +++ b/src/java/org/apache/cassandra/db/guardrails/Threshold.java @@ -35,7 +35,7 @@ public class Threshold extends Guardrail { private final ToLongFunction<ClientState> warnThreshold; private final ToLongFunction<ClientState> failThreshold; - private final ErrorMessageProvider messageProvider; + protected final ErrorMessageProvider messageProvider; /** * Creates a new threshold guardrail. @@ -56,12 +56,12 @@ public class Threshold extends Guardrail this.messageProvider = messageProvider; } - private String errMsg(boolean isWarning, String what, long value, long thresholdValue) + protected String errMsg(boolean isWarning, String what, long value, long thresholdValue) { return messageProvider.createMessage(isWarning, what, - value, - thresholdValue); + Long.toString(value), + Long.toString(thresholdValue)); } private String redactedErrMsg(boolean isWarning, long value, long thresholdValue) @@ -108,6 +108,16 @@ public class Threshold extends Guardrail return enabled(state) && (value > Math.min(failValue(state), warnValue(state))); } + public boolean warnsOn(long value, @Nullable ClientState state) + { + return enabled(state) && (value > warnValue(state) && value <= failValue(state)); + } + + public boolean failsOn(long value, @Nullable ClientState state) + { + return enabled(state) && (value > failValue(state)); + } + /** * Apply the guardrail to the provided value, warning or failing if appropriate. * @@ -163,6 +173,6 @@ public class Threshold extends Guardrail * @param value The value that triggered the guardrail (as a string). * @param threshold The threshold that was passed to trigger the guardrail (as a string). */ - String createMessage(boolean isWarning, String what, long value, long threshold); + String createMessage(boolean isWarning, String what, String value, String threshold); } } diff --git a/src/java/org/apache/cassandra/gms/ApplicationState.java b/src/java/org/apache/cassandra/gms/ApplicationState.java index 4e20d62048..c45d3c2602 100644 --- a/src/java/org/apache/cassandra/gms/ApplicationState.java +++ b/src/java/org/apache/cassandra/gms/ApplicationState.java @@ -56,6 +56,7 @@ public enum ApplicationState * a comma-separated list. **/ SSTABLE_VERSIONS, + DISK_USAGE, // DO NOT EDIT OR REMOVE PADDING STATES BELOW - only add new states above. See CASSANDRA-16484 X1, X2, diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java index 63e9487d7c..26644e17cc 100644 --- a/src/java/org/apache/cassandra/gms/VersionedValue.java +++ b/src/java/org/apache/cassandra/gms/VersionedValue.java @@ -172,6 +172,11 @@ public class VersionedValue implements Comparable<VersionedValue> return new VersionedValue(String.valueOf(load)); } + public VersionedValue diskUsage(String state) + { + return new VersionedValue(state); + } + public VersionedValue schema(UUID newVersion) { return new VersionedValue(newVersion.toString()); diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index 93447c2f44..186ee5abc4 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -398,7 +398,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional public static void guardCollectionSize(TableMetadata metadata, DecoratedKey partitionKey, Unfiltered unfiltered) { - if (!Guardrails.collectionSize.enabled(null) && !Guardrails.itemsPerCollection.enabled(null)) + if (!Guardrails.collectionSize.enabled() && !Guardrails.itemsPerCollection.enabled()) return; if (!unfiltered.isRow() || SchemaConstants.isSystemKeyspace(metadata.keyspace)) diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 0e8b2570fe..6562635dca 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -72,6 +72,7 @@ import org.apache.cassandra.fql.FullQueryLoggerOptionsCompositeData; import org.apache.cassandra.io.util.File; import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.utils.concurrent.FutureCombiner; @@ -1034,6 +1035,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE gossipSnitchInfo(); Schema.instance.startSync(); LoadBroadcaster.instance.startBroadcasting(); + DiskUsageBroadcaster.instance.startBroadcasting(); HintsService.instance.startDispatch(); BatchlogManager.instance.start(); snapshotManager.start(); diff --git a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java new file mode 100644 index 0000000000..4504ac7f62 --- /dev/null +++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageBroadcaster.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.disk.usage; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.NoSpamLogger; + +/** + * Starts {@link DiskUsageMonitor} to monitor local disk usage state and broadcast new state via Gossip. + * At the same time, it caches cluster's disk usage state received via Gossip. + */ +public class DiskUsageBroadcaster implements IEndpointStateChangeSubscriber +{ + private static final Logger logger = LoggerFactory.getLogger(DiskUsageBroadcaster.class); + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 10, TimeUnit.MINUTES); + + public static final DiskUsageBroadcaster instance = new DiskUsageBroadcaster(DiskUsageMonitor.instance); + + private final DiskUsageMonitor monitor; + private final ConcurrentMap<InetAddressAndPort, DiskUsageState> usageInfo = new ConcurrentHashMap<>(); + private volatile boolean hasStuffedOrFullNode = false; + + @VisibleForTesting + public DiskUsageBroadcaster(DiskUsageMonitor monitor) + { + this.monitor = monitor; + Gossiper.instance.register(this); + } + + /** + * @return {@code true} if any node in the cluster is STUFFED OR FULL + */ + public boolean hasStuffedOrFullNode() + { + return hasStuffedOrFullNode; + } + + /** + * @return {@code true} if given node's disk usage is FULL + */ + public boolean isFull(InetAddressAndPort endpoint) + { + return state(endpoint).isFull(); + } + + /** + * @return {@code true} if given node's disk usage is STUFFED + */ + public boolean isStuffed(InetAddressAndPort endpoint) + { + return state(endpoint).isStuffed(); + } + + @VisibleForTesting + public DiskUsageState state(InetAddressAndPort endpoint) + { + return usageInfo.getOrDefault(endpoint, DiskUsageState.NOT_AVAILABLE); + } + + public void startBroadcasting() + { + monitor.start(newState -> { + + if (logger.isTraceEnabled()) + logger.trace("Disseminating disk usage info: {}", newState); + + Gossiper.instance.addLocalApplicationState(ApplicationState.DISK_USAGE, + StorageService.instance.valueFactory.diskUsage(newState.name())); + }); + } + + @Override + public void onChange(InetAddressAndPort endpoint, ApplicationState state, VersionedValue value) + { + if (state != ApplicationState.DISK_USAGE) + return; + + DiskUsageState usageState = DiskUsageState.NOT_AVAILABLE; + try + { + usageState = DiskUsageState.valueOf(value.value); + } + catch (IllegalArgumentException e) + { + noSpamLogger.warn(String.format("Found unknown DiskUsageState: %s. Using default state %s instead.", + value.value, usageState)); + } + usageInfo.put(endpoint, usageState); + + hasStuffedOrFullNode = usageState.isStuffedOrFull() || computeHasStuffedOrFullNode(); + } + + private boolean computeHasStuffedOrFullNode() + { + for (DiskUsageState replicaState : usageInfo.values()) + { + if (replicaState.isStuffedOrFull()) + { + return true; + } + } + return false; + } + + @Override + public void onJoin(InetAddressAndPort endpoint, EndpointState epState) + { + updateDiskUsage(endpoint, epState); + } + + @Override + public void beforeChange(InetAddressAndPort endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) + { + // nothing to do here + } + + @Override + public void onAlive(InetAddressAndPort endpoint, EndpointState state) + { + updateDiskUsage(endpoint, state); + } + + @Override + public void onDead(InetAddressAndPort endpoint, EndpointState state) + { + // do nothing, as we don't care about dead nodes + } + + @Override + public void onRestart(InetAddressAndPort endpoint, EndpointState state) + { + updateDiskUsage(endpoint, state); + } + + @Override + public void onRemove(InetAddressAndPort endpoint) + { + usageInfo.remove(endpoint); + hasStuffedOrFullNode = usageInfo.values().stream().anyMatch(DiskUsageState::isStuffedOrFull); + } + + private void updateDiskUsage(InetAddressAndPort endpoint, EndpointState state) + { + VersionedValue localValue = state.getApplicationState(ApplicationState.DISK_USAGE); + + if (localValue != null) + { + onChange(endpoint, ApplicationState.DISK_USAGE, localValue); + } + } +} diff --git a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java new file mode 100644 index 0000000000..7395c5fb3b --- /dev/null +++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageMonitor.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.disk.usage; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.RoundingMode; +import java.nio.file.FileStore; +import java.nio.file.Files; +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.Memtable; +import org.apache.cassandra.db.guardrails.Guardrails; +import org.apache.cassandra.db.guardrails.GuardrailsConfig; +import org.apache.cassandra.io.util.FileUtils; + +/** + * Schedule periodic task to monitor local disk usage and notify {@link DiskUsageBroadcaster} if local state changed. + */ +public class DiskUsageMonitor +{ + private static final Logger logger = LoggerFactory.getLogger(DiskUsageMonitor.class); + + public static DiskUsageMonitor instance = new DiskUsageMonitor(); + + private final Supplier<GuardrailsConfig> guardrailsConfigSupplier = () -> Guardrails.CONFIG_PROVIDER.getOrCreate(null); + private final Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier; + + private volatile DiskUsageState localState = DiskUsageState.NOT_AVAILABLE; + + @VisibleForTesting + public DiskUsageMonitor() + { + this.dataDirectoriesSupplier = DiskUsageMonitor::dataDirectoriesGroupedByFileStore; + } + + @VisibleForTesting + public DiskUsageMonitor(Supplier<Multimap<FileStore, Directories.DataDirectory>> dataDirectoriesSupplier) + { + this.dataDirectoriesSupplier = dataDirectoriesSupplier; + } + + /** + * Start monitoring local disk usage and call notifier when local disk usage state changed. + */ + public void start(Consumer<DiskUsageState> notifier) + { + // start the scheduler regardless guardrail is enabled, so we can enable it later without a restart + ScheduledExecutors.scheduledTasks.scheduleAtFixedRate(() -> { + + if (!Guardrails.localDataDiskUsage.enabled(null)) + return; + + updateLocalState(getDiskUsage(), notifier); + }, 0, CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.getLong(), TimeUnit.MILLISECONDS); + } + + @VisibleForTesting + public void updateLocalState(double usageRatio, Consumer<DiskUsageState> notifier) + { + double percentage = usageRatio * 100; + long percentageCeiling = (long) Math.ceil(percentage); + + DiskUsageState state = getState(percentageCeiling); + + Guardrails.localDataDiskUsage.guard(percentageCeiling, state.toString(), false, null); + + // if state remains unchanged, no need to notify peers + if (state == localState) + return; + + localState = state; + notifier.accept(state); + } + + /** + * @return local node disk usage state + */ + @VisibleForTesting + public DiskUsageState state() + { + return localState; + } + + /** + * @return The current disk usage (including all memtable sizes) ratio. This is the ratio between the space taken by + * all the data directories and the addition of that same space and the free available space on disk. The space + * taken by the data directories is the addition of the actual space on disk plus the size of the memtables. + * Memtables are included in that calculation because they are expected to be eventually flushed to disk. + */ + @VisibleForTesting + public double getDiskUsage() + { + // using BigInteger to handle large file system + BigInteger used = BigInteger.ZERO; // space used by data directories + BigInteger usable = BigInteger.ZERO; // free space on disks + + for (Map.Entry<FileStore, Collection<Directories.DataDirectory>> e : dataDirectoriesSupplier.get().asMap().entrySet()) + { + usable = usable.add(BigInteger.valueOf(usableSpace(e.getKey()))); + + for (Directories.DataDirectory dir : e.getValue()) + used = used.add(BigInteger.valueOf(dir.getRawSize())); + } + + // The total disk size for data directories is the space that is actually used by those directories plus the + // free space on disk that might be used for storing those directories in the future. + BigInteger total = used.add(usable); + + // That total space can be limited by the config property data_disk_usage_max_disk_size. + DataStorageSpec diskUsageMaxSize = guardrailsConfigSupplier.get().getDataDiskUsageMaxDiskSize(); + if (diskUsageMaxSize != null) + total = total.min(BigInteger.valueOf(diskUsageMaxSize.toBytes())); + + // Add memtables size to the amount of used space because those memtables will be flushed to data directories. + used = used.add(BigInteger.valueOf(getAllMemtableSize())); + + if (logger.isTraceEnabled()) + logger.trace("Disk Usage Guardrail: current disk usage = {}, total disk usage = {}.", + FileUtils.stringifyFileSize(used.doubleValue()), + FileUtils.stringifyFileSize(total.doubleValue())); + + return new BigDecimal(used).divide(new BigDecimal(total), 5, RoundingMode.UP).doubleValue(); + } + + @VisibleForTesting + public long getAllMemtableSize() + { + long size = 0; + + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + { + for (Memtable memtable : cfs.getTracker().getView().getAllMemtables()) + { + size += memtable.getLiveDataSize(); + } + } + + return size; + } + + @VisibleForTesting + public DiskUsageState getState(long usagePercentage) + { + if (!Guardrails.localDataDiskUsage.enabled()) + return DiskUsageState.NOT_AVAILABLE; + + if (Guardrails.localDataDiskUsage.failsOn(usagePercentage, null)) + return DiskUsageState.FULL; + + if (Guardrails.localDataDiskUsage.warnsOn(usagePercentage, null)) + return DiskUsageState.STUFFED; + + return DiskUsageState.SPACIOUS; + } + + public static Multimap<FileStore, Directories.DataDirectory> dataDirectoriesGroupedByFileStore() + { + Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create(); + try + { + for (Directories.DataDirectory dir : Directories.dataDirectories.getAllDirectories()) + { + FileStore store = Files.getFileStore(dir.location.toPath()); + directories.put(store, dir); + } + } + catch (IOException e) + { + throw new RuntimeException("Cannot get data directories grouped by file store", e); + } + return directories; + } + + public static long totalSpace(FileStore store) + { + try + { + long size = store.getTotalSpace(); + return size < 0 ? Long.MAX_VALUE : size; + } + catch (IOException e) + { + throw new RuntimeException("Cannot get total space of file store", e); + } + } + + public static long usableSpace(FileStore store) + { + try + { + long size = store.getUsableSpace(); + return size < 0 ? Long.MAX_VALUE : size; + } + catch (IOException e) + { + throw new RuntimeException("Cannot get usable size of file store", e); + } + } +} + diff --git a/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java new file mode 100644 index 0000000000..9a46251ff8 --- /dev/null +++ b/src/java/org/apache/cassandra/service/disk/usage/DiskUsageState.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.disk.usage; + +import org.apache.cassandra.db.guardrails.GuardrailsConfig; + +public enum DiskUsageState +{ + /** Either disk usage guardrail is not enabled or gossip state is not ready. */ + NOT_AVAILABLE("Not Available"), + + /** + * Disk usage is below both {@link GuardrailsConfig#getDataDiskUsagePercentageWarnThreshold()} ()} and + * {@link GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}. + */ + SPACIOUS("Spacious"), + + /** + * Disk usage exceeds {@link GuardrailsConfig#getDataDiskUsagePercentageWarnThreshold()} but is below + * {@link GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}. + */ + STUFFED("Stuffed"), + + /** Disk usage exceeds {@link GuardrailsConfig#getDataDiskUsagePercentageFailThreshold()}. */ + FULL("Full"); + + private final String msg; + + DiskUsageState(String msg) + { + this.msg = msg; + } + + public boolean isFull() + { + return this == FULL; + } + + public boolean isStuffed() + { + return this == STUFFED; + } + + public boolean isStuffedOrFull() + { + return isFull() || isStuffed(); + } + + @Override + public String toString() + { + return msg; + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java new file mode 100644 index 0000000000..e1868e280f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailDiskUsageTest.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.guardrails; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.Util; +import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.db.guardrails.Guardrails; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; +import org.apache.cassandra.service.disk.usage.DiskUsageMonitor; +import org.apache.cassandra.service.disk.usage.DiskUsageState; +import org.assertj.core.api.Assertions; + +import static net.bytebuddy.matcher.ElementMatchers.named; + +/** + * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} and {@link Guardrails#replicaDiskUsage}. + */ +public class GuardrailDiskUsageTest extends GuardrailTester +{ + private static final int NUM_ROWS = 100; + + private static final String WARN_MESSAGE = "Replica disk usage exceeds warning threshold"; + private static final String FAIL_MESSAGE = "Write request failed because disk usage exceeds failure threshold"; + + private static Cluster cluster; + private static com.datastax.driver.core.Cluster driverCluster; + private static Session driverSession; + + @BeforeClass + public static void setupCluster() throws IOException + { + // speed up the task that calculates and propagates the disk usage info + CassandraRelevantProperties.DISK_USAGE_MONITOR_INTERVAL_MS.setInt(100); + + // build a 2-node cluster with RF=1 + cluster = init(Cluster.build(2) + .withInstanceInitializer(DiskStateInjection::install) + .withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL) + .set("data_disk_usage_percentage_warn_threshold", 98) + .set("data_disk_usage_percentage_fail_threshold", 99) + .set("authenticator", "PasswordAuthenticator")) + .start(), 1); + + // create a regular user, since the default superuser is excluded from guardrails + com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1"); + try (com.datastax.driver.core.Cluster c = builder.withCredentials("cassandra", "cassandra").build(); + Session session = c.connect()) + { + session.execute("CREATE USER test WITH PASSWORD 'test'"); + } + + // connect using that superuser, we use the driver to get access to the client warnings + driverCluster = builder.withCredentials("test", "test").build(); + driverSession = driverCluster.connect(); + } + + @AfterClass + public static void teardownCluster() + { + if (driverSession != null) + driverSession.close(); + + if (driverCluster != null) + driverCluster.close(); + + if (cluster != null) + cluster.close(); + } + + @Override + protected Cluster getCluster() + { + return cluster; + } + + @Test + public void testDiskUsage() throws Throwable + { + schemaChange("CREATE TABLE %s (k int PRIMARY KEY, v int)"); + String insert = format("INSERT INTO %s(k, v) VALUES (?, 0)"); + + // With both nodes in SPACIOUS state, we can write without warnings nor failures + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + + // If the disk usage information about one node becomes unavailable, we can still write without warnings + DiskStateInjection.setState(getCluster(), 2, DiskUsageState.NOT_AVAILABLE); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + + // If one node becomes STUFFED, the writes targeting that node will raise a warning, while the writes targetting + // the node that remains SPACIOUS will keep succeeding without warnings + DiskStateInjection.setState(getCluster(), 2, DiskUsageState.STUFFED); + int numWarnings = 0; + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + + List<String> warnings = rs.getExecutionInfo().getWarnings(); + if (!warnings.isEmpty()) + { + Assertions.assertThat(warnings).hasSize(1).anyMatch(s -> s.contains(WARN_MESSAGE)); + numWarnings++; + } + } + Assertions.assertThat(numWarnings).isGreaterThan(0).isLessThan(NUM_ROWS); + + // If the STUFFED node becomes FULL, the writes targeting that node will fail, while the writes targeting + // the node that remains SPACIOUS will keep succeeding without warnings + DiskStateInjection.setState(getCluster(), 2, DiskUsageState.FULL); + int numFailures = 0; + for (int i = 0; i < NUM_ROWS; i++) + { + try + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + catch (InvalidQueryException e) + { + Assertions.assertThat(e).hasMessageContaining(FAIL_MESSAGE); + numFailures++; + } + } + Assertions.assertThat(numFailures).isGreaterThan(0).isLessThan(NUM_ROWS); + + // If both nodes are FULL, all queries will fail + DiskStateInjection.setState(getCluster(), 1, DiskUsageState.FULL); + for (int i = 0; i < NUM_ROWS; i++) + { + try + { + driverSession.execute(insert, i); + Assertions.fail("Should have failed"); + } + catch (InvalidQueryException e) + { + numFailures++; + } + } + + // Finally, if both nodes go back to SPACIOUS, all queries will succeed again + DiskStateInjection.setState(getCluster(), 1, DiskUsageState.SPACIOUS); + DiskStateInjection.setState(getCluster(), 2, DiskUsageState.SPACIOUS); + for (int i = 0; i < NUM_ROWS; i++) + { + ResultSet rs = driverSession.execute(insert, i); + Assertions.assertThat(rs.getExecutionInfo().getWarnings()).isEmpty(); + } + } + + /** + * ByteBuddy rule to override the disk usage state of each node. + */ + public static class DiskStateInjection + { + public static volatile DiskUsageState state = DiskUsageState.SPACIOUS; + + private static void install(ClassLoader cl, int node) + { + new ByteBuddy().rebase(DiskUsageMonitor.class) + .method(named("getState")) + .intercept(MethodDelegation.to(DiskStateInjection.class)) + .make() + .load(cl, ClassLoadingStrategy.Default.INJECTION); + } + + public static void setState(Cluster cluster, int node, DiskUsageState state) + { + IInvokableInstance instance = cluster.get(node); + instance.runOnInstance(() -> DiskStateInjection.state = state); + + // wait for disk usage state propagation, all nodes must see it + InetAddressAndPort enpoint = InetAddressAndPort.getByAddress(instance.broadcastAddress()); + cluster.forEach(n -> n.runOnInstance(() -> Util.spinAssertEquals(state, () -> DiskUsageBroadcaster.instance.state(enpoint), 60))); + } + + @SuppressWarnings("unused") + public static DiskUsageState getState(long usagePercentage, @SuperCall Callable<DiskUsageState> zuper) + { + return state; + } + } +} diff --git a/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java b/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java index 66c644498d..4dad97a0a8 100644 --- a/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java +++ b/test/unit/org/apache/cassandra/config/DataStorageSpecTest.java @@ -101,11 +101,36 @@ public class DataStorageSpecTest public void testEquals() { assertEquals(new DataStorageSpec("10B"), new DataStorageSpec("10B")); + assertEquals(new DataStorageSpec("10KiB"), new DataStorageSpec("10240B")); assertEquals(new DataStorageSpec("10240B"), new DataStorageSpec("10KiB")); + + assertEquals(new DataStorageSpec("10MiB"), new DataStorageSpec("10240KiB")); + assertEquals(new DataStorageSpec("10240KiB"), new DataStorageSpec("10MiB")); + + assertEquals(new DataStorageSpec("10GiB"), new DataStorageSpec("10240MiB")); + assertEquals(new DataStorageSpec("10240MiB"), new DataStorageSpec("10GiB")); + + assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE), DataStorageSpec.inGibibytes(Long.MAX_VALUE)); + assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE), DataStorageSpec.inMebibytes(Long.MAX_VALUE)); + assertNotEquals(DataStorageSpec.inBytes(Long.MAX_VALUE), DataStorageSpec.inKibibytes(Long.MAX_VALUE)); + assertEquals(DataStorageSpec.inBytes(Long.MAX_VALUE), DataStorageSpec.inBytes(Long.MAX_VALUE)); + + assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE), DataStorageSpec.inGibibytes(Long.MAX_VALUE)); + assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE), DataStorageSpec.inMebibytes(Long.MAX_VALUE)); + assertEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE), DataStorageSpec.inKibibytes(Long.MAX_VALUE)); + assertNotEquals(DataStorageSpec.inKibibytes(Long.MAX_VALUE), DataStorageSpec.inBytes(Long.MAX_VALUE)); + + assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), DataStorageSpec.inGibibytes(Long.MAX_VALUE)); assertEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), DataStorageSpec.inMebibytes(Long.MAX_VALUE)); - assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), DataStorageSpec.inKibibytes(Long.MAX_VALUE)); assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), DataStorageSpec.inBytes(Long.MAX_VALUE)); + assertNotEquals(DataStorageSpec.inMebibytes(Long.MAX_VALUE), DataStorageSpec.inBytes(Long.MAX_VALUE)); + + assertEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE), DataStorageSpec.inGibibytes(Long.MAX_VALUE)); + assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE), DataStorageSpec.inMebibytes(Long.MAX_VALUE)); + assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE), DataStorageSpec.inKibibytes(Long.MAX_VALUE)); + assertNotEquals(DataStorageSpec.inGibibytes(Long.MAX_VALUE), DataStorageSpec.inBytes(Long.MAX_VALUE)); + assertNotEquals(new DataStorageSpec("0MiB"), new DataStorageSpec("10KiB")); } @@ -138,4 +163,4 @@ public class DataStorageSpecTest Gen<DataStorageSpec> gen = rs -> new DataStorageSpec(valueGen.generate(rs), unitGen.generate(rs)); return gen.describedAs(DataStorageSpec::toString); } -} \ No newline at end of file +} diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java index 482e37b2ef..1483e8101f 100644 --- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailCollectionSizeTest.java @@ -48,12 +48,12 @@ public class GuardrailCollectionSizeTest extends ThresholdTester public GuardrailCollectionSizeTest() { - super(WARN_THRESHOLD / 1024, // to KiB - FAIL_THRESHOLD / 1024, // to KiB + super(WARN_THRESHOLD + "B", + FAIL_THRESHOLD + "B", Guardrails.collectionSize, - Guardrails::setCollectionSizeThresholdInKiB, - Guardrails::getCollectionSizeWarnThresholdInKiB, - Guardrails::getCollectionSizeFailThresholdInKiB); + Guardrails::setCollectionSizeThreshold, + Guardrails::getCollectionSizeWarnThreshold, + Guardrails::getCollectionSizeFailThreshold); } @After diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java new file mode 100644 index 0000000000..14c2c3acea --- /dev/null +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailDiskUsageTest.java @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.guardrails; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.nio.file.FileStore; +import java.util.Arrays; +import java.util.function.Consumer; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimap; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DataStorageSpec; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.disk.usage.DiskUsageBroadcaster; +import org.apache.cassandra.service.disk.usage.DiskUsageMonitor; +import org.apache.cassandra.service.disk.usage.DiskUsageState; +import org.apache.cassandra.utils.FBUtilities; +import org.mockito.Mockito; + +import static org.apache.cassandra.service.disk.usage.DiskUsageState.FULL; +import static org.apache.cassandra.service.disk.usage.DiskUsageState.NOT_AVAILABLE; +import static org.apache.cassandra.service.disk.usage.DiskUsageState.SPACIOUS; +import static org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Tests the guardrails for disk usage, {@link Guardrails#localDataDiskUsage} and {@link Guardrails#replicaDiskUsage}. + */ +public class GuardrailDiskUsageTest extends GuardrailTester +{ + private static int defaultDataDiskUsagePercentageWarnThreshold; + private static int defaultDataDiskUsagePercentageFailThreshold; + + @BeforeClass + public static void beforeClass() + { + defaultDataDiskUsagePercentageWarnThreshold = Guardrails.instance.getDataDiskUsagePercentageWarnThreshold(); + defaultDataDiskUsagePercentageFailThreshold = Guardrails.instance.getDataDiskUsagePercentageFailThreshold(); + + Guardrails.instance.setDataDiskUsagePercentageThreshold(-1, -1); + } + + @AfterClass + public static void afterClass() + { + Guardrails.instance.setDataDiskUsagePercentageThreshold(defaultDataDiskUsagePercentageWarnThreshold, + defaultDataDiskUsagePercentageFailThreshold); + } + + @Test + public void testConfigValidation() + { + assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize(null)); + assertNull(guardrails().getDataDiskUsageMaxDiskSize()); + + assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0B"), "0 is not allowed"); + assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0KiB"), "0 is not allowed"); + assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0MiB"), "0 is not allowed"); + assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize("0GiB"), "0 is not allowed"); + + assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("10B")); + assertEquals("10B", guardrails().getDataDiskUsageMaxDiskSize()); + + assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("20KiB")); + assertEquals("20KiB", guardrails().getDataDiskUsageMaxDiskSize()); + + assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("30MiB")); + assertEquals("30MiB", guardrails().getDataDiskUsageMaxDiskSize()); + + assertConfigValid(x -> x.setDataDiskUsageMaxDiskSize("40GiB")); + assertEquals("40GiB", guardrails().getDataDiskUsageMaxDiskSize()); + + assertConfigFails(x -> x.setDataDiskUsageMaxDiskSize(Long.MAX_VALUE + "GiB"), "are actually available on disk"); + + // warn threshold smaller than lower bound + assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(0, 80), "0 is not allowed"); + + // fail threshold bigger than upper bound + assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(1, 110), "maximum allowed value is 100"); + + // warn threshold larger than fail threshold + assertConfigFails(x -> x.setDataDiskUsagePercentageThreshold(60, 50), + "The warn threshold 60 for data_disk_usage_percentage_warn_threshold should be lower than the fail threshold 50"); + } + + @Test + public void testDiskUsageState() + { + guardrails().setDataDiskUsagePercentageThreshold(50, 90); + + // under usage + assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(10)); + assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50)); + + // exceed warning threshold + assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51)); + assertEquals(STUFFED, DiskUsageMonitor.instance.getState(56)); + assertEquals(STUFFED, DiskUsageMonitor.instance.getState(90)); + + // exceed fail threshold + assertEquals(FULL, DiskUsageMonitor.instance.getState(91)); + assertEquals(FULL, DiskUsageMonitor.instance.getState(100)); + + // shouldn't be possible to go over 100% but just to be sure + assertEquals(FULL, DiskUsageMonitor.instance.getState(101)); + assertEquals(FULL, DiskUsageMonitor.instance.getState(500)); + } + + @Test + public void testDiskUsageDetectorWarnDisabled() + { + guardrails().setDataDiskUsagePercentageThreshold(-1, 90); + + // under usage + assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(0)); + assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50)); + assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(90)); + + // exceed fail threshold + assertEquals(FULL, DiskUsageMonitor.instance.getState(91)); + assertEquals(FULL, DiskUsageMonitor.instance.getState(100)); + } + + @Test + public void testDiskUsageDetectorFailDisabled() + { + guardrails().setDataDiskUsagePercentageThreshold(50, -1); + + // under usage + assertEquals(SPACIOUS, DiskUsageMonitor.instance.getState(50)); + + // exceed warning threshold + assertEquals(STUFFED, DiskUsageMonitor.instance.getState(51)); + assertEquals(STUFFED, DiskUsageMonitor.instance.getState(80)); + assertEquals(STUFFED, DiskUsageMonitor.instance.getState(100)); + } + + @Test + public void testDiskUsageGuardrailDisabled() + { + guardrails().setDataDiskUsagePercentageThreshold(-1, -1); + + assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(0)); + assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(60)); + assertEquals(NOT_AVAILABLE, DiskUsageMonitor.instance.getState(100)); + } + + @Test + public void testMemtableSizeIncluded() throws Throwable + { + DiskUsageMonitor monitor = new DiskUsageMonitor(); + + createTable(keyspace(), "CREATE TABLE %s (k text PRIMARY KEY, v text) WITH compression = { 'enabled': false }"); + + long memtableSizeBefore = monitor.getAllMemtableSize(); + int rows = 10; + int mb = 1024 * 1024; + + for (int i = 0; i < rows; i++) + { + char[] chars = new char[mb]; + Arrays.fill(chars, (char) i); + String value = String.copyValueOf(chars); + execute("INSERT INTO %s (k, v) VALUES(?, ?)", i, value); + } + + // verify memtables are included + long memtableSizeAfterInsert = monitor.getAllMemtableSize(); + assertTrue(String.format("Expect at least 10MB more data, but got before: %s and after: %d", + memtableSizeBefore, memtableSizeAfterInsert), + memtableSizeAfterInsert - memtableSizeBefore >= rows * mb); + + // verify memtable size are reduced after flush + flush(); + long memtableSizeAfterFlush = monitor.getAllMemtableSize(); + assertEquals(memtableSizeBefore, memtableSizeAfterFlush, mb); + } + + @Test + public void testMonitorLogsOnStateChange() + { + guardrails().setDataDiskUsagePercentageThreshold(50, 90); + + Guardrails.localDataDiskUsage.resetLastNotifyTime(); + + DiskUsageMonitor monitor = new DiskUsageMonitor(); + + // transit to SPACIOUS, no logging + assertMonitorStateTransition(0.50, SPACIOUS, monitor); + + // transit to STUFFED, expect warning + assertMonitorStateTransition(0.50001, STUFFED, monitor, true, "Local data disk usage 51%(Stuffed) exceeds warning threshold of 50%"); + + // remain as STUFFED, no logging because of min log interval + assertMonitorStateTransition(0.90, STUFFED, monitor); + + // transit to FULL, expect failure + assertMonitorStateTransition(0.90001, FULL, monitor, false, "Local data disk usage 91%(Full) exceeds failure threshold of 90%, will stop accepting writes"); + + // remain as FULL, no logging because of min log interval + assertMonitorStateTransition(0.99, FULL, monitor); + + // remain as FULL, no logging because of min log interval + assertMonitorStateTransition(5.0, FULL, monitor); + + // transit back to STUFFED, no warning because of min log interval + assertMonitorStateTransition(0.90, STUFFED, monitor); + + // transit back to FULL, no logging because of min log interval + assertMonitorStateTransition(0.900001, FULL, monitor); + + // transit back to STUFFED, no logging because of min log interval + assertMonitorStateTransition(0.90, STUFFED, monitor); + + // transit to SPACIOUS, no logging + assertMonitorStateTransition(0.50, SPACIOUS, monitor); + } + + @Test + public void testDiskUsageBroadcaster() throws UnknownHostException + { + DiskUsageBroadcaster broadcaster = new DiskUsageBroadcaster(null); + Gossiper.instance.unregister(broadcaster); + + InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.1"); + InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.2"); + InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.3"); + + // initially it's NOT_AVAILABLE + assertFalse(broadcaster.hasStuffedOrFullNode()); + assertFalse(broadcaster.isFull(node1)); + assertFalse(broadcaster.isFull(node2)); + assertFalse(broadcaster.isFull(node3)); + + // adding 1st node: Spacious, cluster has no Full node + broadcaster.onChange(node1, ApplicationState.DISK_USAGE, value(SPACIOUS)); + assertFalse(broadcaster.hasStuffedOrFullNode()); + assertFalse(broadcaster.isFull(node1)); + + // adding 2nd node with wrong ApplicationState + broadcaster.onChange(node2, ApplicationState.RACK, value(FULL)); + assertFalse(broadcaster.hasStuffedOrFullNode()); + assertFalse(broadcaster.isFull(node2)); + + // adding 2nd node: STUFFED + broadcaster.onChange(node2, ApplicationState.DISK_USAGE, value(STUFFED)); + assertTrue(broadcaster.hasStuffedOrFullNode()); + assertTrue(broadcaster.isStuffed(node2)); + + // adding 3rd node: FULL + broadcaster.onChange(node3, ApplicationState.DISK_USAGE, value(FULL)); + assertTrue(broadcaster.hasStuffedOrFullNode()); + assertTrue(broadcaster.isFull(node3)); + + // remove 2nd node, cluster has Full node + broadcaster.onRemove(node2); + assertTrue(broadcaster.hasStuffedOrFullNode()); + assertFalse(broadcaster.isStuffed(node2)); + + // remove 3nd node, cluster has no Full node + broadcaster.onRemove(node3); + assertFalse(broadcaster.hasStuffedOrFullNode()); + assertFalse(broadcaster.isFull(node3)); + } + + @Test + public void testDiskUsageCalculationWithMaxDiskSize() throws IOException + { + Directories.DataDirectory directory = mock(Directories.DataDirectory.class); + when(directory.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes()); + + FileStore store = mock(FileStore.class); + when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 5).toBytes()); // 100GiB disk + + Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create(); + directories.put(store, directory); + DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> directories)); + + doCallRealMethod().when(monitor).getDiskUsage(); + doReturn(0L).when(monitor).getAllMemtableSize(); + + guardrails().setDataDiskUsageMaxDiskSize(null); + assertThat(monitor.getDiskUsage()).isEqualTo(0.05); + + // 5G are used of 10G + guardrails().setDataDiskUsageMaxDiskSize("10GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(0.5); + + // max disk size = space used + guardrails().setDataDiskUsageMaxDiskSize("5GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(1.0); + + // max disk size < space used + guardrails().setDataDiskUsageMaxDiskSize("1GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(5.0); + } + + @Test + public void testDiskUsageCalculationWithMaxDiskSizeAndSmallUnits() throws IOException + { + // 5GiB used out of 100GiB disk + long freeDiskSizeInBytes = DataStorageSpec.inGibibytes(100).toBytes() - DataStorageSpec.inMebibytes(5).toBytes(); + + FileStore store = mock(FileStore.class); + when(store.getUsableSpace()).thenReturn(DataStorageSpec.inBytes(freeDiskSizeInBytes).toBytes()); // 100GiB disk + + Directories.DataDirectory directory = mock(Directories.DataDirectory.class); + when(directory.getRawSize()).thenReturn(DataStorageSpec.inMebibytes(5).toBytes()); + + Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create(); + directories.put(store, directory); + DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> directories)); + + doCallRealMethod().when(monitor).getDiskUsage(); + doReturn(0L).when(monitor).getAllMemtableSize(); + + guardrails().setDataDiskUsageMaxDiskSize(null); + assertThat(monitor.getDiskUsage()).isEqualTo(0.00005); + + // 5MiB are used of 10MiB + guardrails().setDataDiskUsageMaxDiskSize("10MiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(0.5); + + // max disk size = space used + guardrails().setDataDiskUsageMaxDiskSize("5MiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(1.0); + + // max disk size < space used + guardrails().setDataDiskUsageMaxDiskSize("1MiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(5.0); + } + + @Test + public void testDiskUsageCalculationWithMaxDiskSizeAndMultipleVolumes() throws IOException + { + Mockito.reset(); + + Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create(); + + Directories.DataDirectory directory1 = mock(Directories.DataDirectory.class); + FileStore store1 = mock(FileStore.class); + when(directory1.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes()); + when(store1.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 5).toBytes()); // 100 GiB disk + directories.put(store1, directory1); + + Directories.DataDirectory directory2 = mock(Directories.DataDirectory.class); + FileStore store2 = mock(FileStore.class); + when(directory2.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(25).toBytes()); + when(store2.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 25).toBytes()); // 100 GiB disk + directories.put(store2, directory2); + + Directories.DataDirectory directory3 = mock(Directories.DataDirectory.class); + FileStore store3 = mock(FileStore.class); + when(directory3.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(20).toBytes()); + when(store3.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(100 - 20).toBytes()); // 100 GiB disk + directories.put(store3, directory3); + + DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> directories)); + + doCallRealMethod().when(monitor).getDiskUsage(); + doReturn(0L).when(monitor).getAllMemtableSize(); + + // 50G/300G as each disk has a capacity of 100G + guardrails().setDataDiskUsageMaxDiskSize(null); + assertThat(monitor.getDiskUsage()).isEqualTo(0.16667); + + // 50G/100G + guardrails().setDataDiskUsageMaxDiskSize("100GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(0.5); + + // 50G/75G + guardrails().setDataDiskUsageMaxDiskSize("75GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(0.66667); + + // 50G/50G + guardrails().setDataDiskUsageMaxDiskSize("50GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(1.0); + + // 50G/49G + guardrails().setDataDiskUsageMaxDiskSize("49GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(1.02041); + } + + @Test + public void testDiskUsageCalculationWithMaxDiskSizeAndMultipleDirectories() throws IOException + { + Mockito.reset(); + + Directories.DataDirectory directory1 = mock(Directories.DataDirectory.class); + when(directory1.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(5).toBytes()); + + Directories.DataDirectory directory2 = mock(Directories.DataDirectory.class); + when(directory2.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(25).toBytes()); + + Directories.DataDirectory directory3 = mock(Directories.DataDirectory.class); + when(directory3.getRawSize()).thenReturn(DataStorageSpec.inGibibytes(20).toBytes()); + + FileStore store = mock(FileStore.class); + when(store.getUsableSpace()).thenReturn(DataStorageSpec.inGibibytes(300 - 5 - 25 - 20).toBytes()); // 100 GiB disk + + Multimap<FileStore, Directories.DataDirectory> directories = HashMultimap.create(); + directories.putAll(store, ImmutableSet.of(directory1, directory2, directory3)); + + DiskUsageMonitor monitor = spy(new DiskUsageMonitor(() -> directories)); + + doCallRealMethod().when(monitor).getDiskUsage(); + doReturn(0L).when(monitor).getAllMemtableSize(); + + // 50G/300G as disk has a capacity of 300G + guardrails().setDataDiskUsageMaxDiskSize(null); + assertThat(monitor.getDiskUsage()).isEqualTo(0.16667); + + // 50G/100G + guardrails().setDataDiskUsageMaxDiskSize("100GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(0.5); + + // 50G/75G + guardrails().setDataDiskUsageMaxDiskSize("75GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(0.66667); + + // 50G/50G + guardrails().setDataDiskUsageMaxDiskSize("50GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(1.0); + + // 50G/49G + guardrails().setDataDiskUsageMaxDiskSize("49GiB"); + assertThat(monitor.getDiskUsage()).isEqualTo(1.02041); + } + + @Test + public void testWriteRequests() throws Throwable + { + createTable("CREATE TABLE %s (k int PRIMARY KEY, v int)"); + + InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); + InetAddressAndPort node1 = InetAddressAndPort.getByName("127.0.0.11"); + InetAddressAndPort node2 = InetAddressAndPort.getByName("127.0.0.21"); + InetAddressAndPort node3 = InetAddressAndPort.getByName("127.0.0.31"); + + Guardrails.replicaDiskUsage.resetLastNotifyTime(); + guardrails().setDataDiskUsagePercentageThreshold(98, 99); + + ConsistencyLevel cl = ConsistencyLevel.LOCAL_QUORUM; + String select = "SELECT * FROM %s"; + String insert = "INSERT INTO %s (k, v) VALUES (0, 0)"; + String batch = "BEGIN BATCH " + + "INSERT INTO %s (k, v) VALUES (1, 1);" + + "INSERT INTO %<s (k, v) VALUES (2, 2); " + + "APPLY BATCH"; + CheckedFunction userSelect = () -> execute(userClientState, select, cl); + CheckedFunction userInsert = () -> execute(userClientState, insert, cl); + CheckedFunction userBatch = () -> execute(userClientState, batch, cl); + + // default state, write request works fine + assertValid(userSelect); + assertValid(userInsert); + assertValid(userBatch); + + // verify node1 NOT_AVAILABLE won't affect writes + setDiskUsageState(node1, NOT_AVAILABLE); + assertValid(userSelect); + assertValid(userInsert); + assertValid(userBatch); + + // verify node2 Spacious won't affect writes + setDiskUsageState(node2, SPACIOUS); + assertValid(userSelect); + assertValid(userInsert); + assertValid(userBatch); + + // verify node3 STUFFED won't trigger warning as it's not write replica + setDiskUsageState(node3, STUFFED); + assertValid(userSelect); + assertValid(userInsert); + assertValid(userBatch); + + // verify node3 Full won't affect writes as it's not write replica + setDiskUsageState(node3, FULL); + assertValid(userSelect); + assertValid(userInsert); + assertValid(userBatch); + + // verify local node STUFF, will log warning + setDiskUsageState(local, STUFFED); + assertValid(userSelect); + assertWarns(userInsert); + assertWarns(userBatch); + + // verify local node Full, will reject writes + setDiskUsageState(local, FULL); + assertValid(userSelect); + assertFails(userInsert); + assertFails(userBatch); + + // excluded users can write to FULL cluster + useSuperUser(); + Guardrails.replicaDiskUsage.resetLastNotifyTime(); + for (ClientState excludedUser : Arrays.asList(systemClientState, superClientState)) + { + assertValid(() -> execute(excludedUser, select, cl)); + assertValid(() -> execute(excludedUser, insert, cl)); + assertValid(() -> execute(excludedUser, batch, cl)); + } + + // verify local node STUFFED won't reject writes + setDiskUsageState(local, STUFFED); + assertValid(userSelect); + assertWarns(userInsert); + assertWarns(userBatch); + } + + @Override + protected void assertValid(CheckedFunction function) throws Throwable + { + Guardrails.replicaDiskUsage.resetLastNotifyTime(); + super.assertValid(function); + } + + protected void assertWarns(CheckedFunction function) throws Throwable + { + Guardrails.replicaDiskUsage.resetLastNotifyTime(); + super.assertWarns(function, "Replica disk usage exceeds warning threshold"); + } + + protected void assertFails(CheckedFunction function) throws Throwable + { + Guardrails.replicaDiskUsage.resetLastNotifyTime(); + super.assertFails(function, "Write request failed because disk usage exceeds failure threshold"); + } + + private static void setDiskUsageState(InetAddressAndPort endpoint, DiskUsageState state) + { + DiskUsageBroadcaster.instance.onChange(endpoint, ApplicationState.DISK_USAGE, value(state)); + } + + private static VersionedValue value(DiskUsageState state) + { + return StorageService.instance.valueFactory.diskUsage(state.name()); + } + + private void assertMonitorStateTransition(double usageRatio, DiskUsageState state, DiskUsageMonitor monitor) + { + assertMonitorStateTransition(usageRatio, state, monitor, false, null); + } + + private void assertMonitorStateTransition(double usageRatio, DiskUsageState state, DiskUsageMonitor monitor, + boolean isWarn, String msg) + { + boolean stateChanged = state != monitor.state(); + Consumer<DiskUsageState> notifier = newState -> { + if (stateChanged) + assertEquals(state, newState); + else + fail("Expect no notification if state remains the same"); + }; + + monitor.updateLocalState(usageRatio, notifier); + assertEquals(state, monitor.state()); + + if (msg == null) + { + listener.assertNotFailed(); + listener.assertNotWarned(); + } + else if (isWarn) + { + listener.assertWarned(msg); + listener.assertNotFailed(); + } + else + { + listener.assertFailed(msg); + listener.assertNotWarned(); + } + + listener.clear(); + } +} diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java index 74747bbe5a..986e1d698f 100644 --- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailTester.java @@ -427,6 +427,11 @@ public abstract class GuardrailTester extends CQLTester .collect(Collectors.toList()); } + protected void assertConfigValid(Consumer<Guardrails> consumer) + { + consumer.accept(guardrails()); + } + protected void assertConfigFails(Consumer<Guardrails> consumer, String message) { try @@ -533,6 +538,11 @@ public abstract class GuardrailTester extends CQLTester assertTrue(format("Expect no warning diagnostic events but got %s", warnings), warnings.isEmpty()); } + public void assertWarned(String message) + { + assertWarned(Collections.singletonList(message)); + } + public void assertWarned(List<String> messages) { assertFalse("Expected to emit warning diagnostic event, but no warning was emitted", warnings.isEmpty()); diff --git a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java index b98659e9a5..b278e7f6f8 100644 --- a/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java +++ b/test/unit/org/apache/cassandra/db/guardrails/GuardrailsTest.java @@ -293,6 +293,52 @@ public class GuardrailsTest extends GuardrailTester assertValid(() -> disallowed.guard(set(4), action, superClientState)); } + @Test + public void testPredicates() throws Throwable + { + Predicates<Integer> guard = new Predicates<>("x", + state -> x -> x > 10, + state -> x -> x > 100, + (isWarn, value) -> format("%s: %s", isWarn ? "Warning" : "Aborting", value)); + + assertValid(() -> guard.guard(5, userClientState)); + assertWarns(() -> guard.guard(25, userClientState), "Warning: 25"); + assertWarns(() -> guard.guard(100, userClientState), "Warning: 100"); + assertFails(() -> guard.guard(101, userClientState), "Aborting: 101"); + assertFails(() -> guard.guard(200, userClientState), "Aborting: 200"); + assertValid(() -> guard.guard(5, userClientState)); + } + + @Test + public void testPredicatesUsers() throws Throwable + { + Predicates<Integer> guard = new Predicates<>("x", + state -> x -> x > 10, + state -> x -> x > 100, + (isWarn, value) -> format("%s: %s", isWarn ? "Warning" : "Aborting", value)); + + assertTrue(guard.enabled()); + assertTrue(guard.enabled(null)); + assertTrue(guard.enabled(userClientState)); + assertFalse(guard.enabled(systemClientState)); + assertFalse(guard.enabled(superClientState)); + + assertValid(() -> guard.guard(5, null)); + assertValid(() -> guard.guard(5, userClientState)); + assertValid(() -> guard.guard(5, systemClientState)); + assertValid(() -> guard.guard(5, superClientState)); + + assertWarns(() -> guard.guard(25, null), "Warning: 25"); + assertWarns(() -> guard.guard(25, userClientState), "Warning: 25"); + assertValid(() -> guard.guard(25, systemClientState)); + assertValid(() -> guard.guard(25, superClientState)); + + assertFails(() -> guard.guard(101, null), false, "Aborting: 101"); + assertFails(() -> guard.guard(101, userClientState), "Aborting: 101"); + assertValid(() -> guard.guard(101, systemClientState)); + assertValid(() -> guard.guard(101, superClientState)); + } + private static Set<Integer> set(Integer value) { return Collections.singleton(value); diff --git a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java index 885f626881..a04cc9933b 100644 --- a/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java +++ b/test/unit/org/apache/cassandra/db/guardrails/ThresholdTester.java @@ -21,13 +21,14 @@ package org.apache.cassandra.db.guardrails; import java.util.Collections; import java.util.List; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.function.ToIntFunction; import java.util.function.ToLongFunction; import org.junit.Before; import org.junit.Test; -import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DataStorageSpec; import org.apache.cassandra.exceptions.ConfigurationException; import org.assertj.core.api.Assertions; @@ -47,7 +48,7 @@ public abstract class ThresholdTester extends GuardrailTester private final ToLongFunction<Guardrails> warnGetter; private final ToLongFunction<Guardrails> failGetter; private final long maxValue; - private final long disabledValue; + private final Long disabledValue; protected ThresholdTester(int warnThreshold, int failThreshold, @@ -63,7 +64,7 @@ public abstract class ThresholdTester extends GuardrailTester this.warnGetter = g -> (long) warnGetter.applyAsInt(g); this.failGetter = g -> (long) failGetter.applyAsInt(g); maxValue = Integer.MAX_VALUE; - disabledValue = Config.DISABLED_GUARDRAIL; + disabledValue = -1L; } protected ThresholdTester(long warnThreshold, @@ -80,7 +81,24 @@ public abstract class ThresholdTester extends GuardrailTester this.warnGetter = warnGetter; this.failGetter = failGetter; maxValue = Long.MAX_VALUE; - disabledValue = Config.DISABLED_SIZE_GUARDRAIL.toBytes(); + disabledValue = -1L; + } + + protected ThresholdTester(String warnThreshold, + String failThreshold, + Threshold threshold, + TriConsumer<Guardrails, String, String> setter, + Function<Guardrails, String> warnGetter, + Function<Guardrails, String> failGetter) + { + super(threshold); + this.warnThreshold = new DataStorageSpec(warnThreshold).toBytes(); + this.failThreshold = new DataStorageSpec(failThreshold).toBytes(); + this.setter = (g, w, a) -> setter.accept(g, w == null ? null : DataStorageSpec.inBytes(w).toString(), a == null ? null : DataStorageSpec.inBytes(a).toString()); + this.warnGetter = g -> new DataStorageSpec(warnGetter.apply(g)).toBytes(); + this.failGetter = g -> new DataStorageSpec(failGetter.apply(g)).toBytes(); + maxValue = Long.MAX_VALUE; + disabledValue = null; } protected long currentValue() @@ -225,7 +243,7 @@ public abstract class ThresholdTester extends GuardrailTester assertInvalidStrictlyPositiveProperty(setter, Integer.MIN_VALUE, name); assertInvalidStrictlyPositiveProperty(setter, -2, name); assertValidProperty(setter, disabledValue); - assertInvalidStrictlyPositiveProperty(setter, disabledValue == 0 ? -1 : 0, name); + assertInvalidStrictlyPositiveProperty(setter, disabledValue == null ? -1 : 0, name); assertValidProperty(setter, 1L); assertValidProperty(setter, 2L); assertValidProperty(setter, maxValue); diff --git a/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java b/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java index a2bce7d3d6..3f859dcf99 100644 --- a/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/GossipInfoTableTest.java @@ -80,7 +80,7 @@ public class GossipInfoTableTest extends CQLTester assertThat(entry).isNotEmpty(); UntypedResultSet.Row row = resultSet.one(); - assertThat(row.getColumns().size()).isEqualTo(62); + assertThat(row.getColumns().size()).isEqualTo(64); InetAddressAndPort endpoint = entry.get().getKey(); EndpointState localState = entry.get().getValue(); @@ -109,6 +109,7 @@ public class GossipInfoTableTest extends CQLTester assertValue(row, "native_address_and_port", localState, ApplicationState.NATIVE_ADDRESS_AND_PORT); assertValue(row, "status_with_port", localState, ApplicationState.STATUS_WITH_PORT); assertValue(row, "sstable_versions", localState, ApplicationState.SSTABLE_VERSIONS); + assertValue(row, "disk_usage", localState, ApplicationState.DISK_USAGE); assertValue(row, "x_11_padding", localState, ApplicationState.X_11_PADDING); assertValue(row, "x1", localState, ApplicationState.X1); assertValue(row, "x2", localState, ApplicationState.X2); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org