This is an automated email from the ASF dual-hosted git repository. ferruzzi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 481feb32a8 Add a fuzzy/regex pattern-matching for metric allow and block list (#36250) 481feb32a8 is described below commit 481feb32a835277bbd26715c73c2bb85b25b99ad Author: D. Ferruzzi <ferru...@amazon.com> AuthorDate: Fri Jan 12 16:19:39 2024 -0800 Add a fuzzy/regex pattern-matching for metric allow and block list (#36250) * Add a fuzzy match for metric allow and block list * Add deprecation notice and regex pattern unit tests * rename "fuzzy matching" to "pattern matching" * Improve pattern-matching test * clarify docstring phrasing Co-authored-by: Niko Oliveira <oniko...@amazon.com> * line length issue * rephrased docs * hussein changes --------- Co-authored-by: Niko Oliveira <oniko...@amazon.com> --- airflow/config_templates/config.yml | 24 ++++-- airflow/metrics/otel_logger.py | 11 ++- airflow/metrics/statsd_logger.py | 17 +---- airflow/metrics/validators.py | 64 +++++++++++++++- tests/core/test_stats.py | 148 +++++++++++++++++++++++++++++------- 5 files changed, 208 insertions(+), 56 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 1788e00593..79932a876a 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -944,24 +944,32 @@ metrics: description: | StatsD (https://github.com/etsy/statsd) integration settings. options: + metrics_use_pattern_match: + description: | + If true, metrics_allow_list and metrics_block_list will use regex pattern matching + anywhere within the metric name instead of only prefix matching at the start of the name. + version_added: 2.9.0 + type: boolean + example: ~ + default: "False" metrics_allow_list: description: | - If you want to avoid emitting all the available metrics, you can configure an - allow list of prefixes (comma separated) to send only the metrics that start - with the elements of the list (e.g: "scheduler,executor,dagrun") + Configure an allow list (comma separated string) to send only certain metrics. + If metrics_use_pattern_match is false, match only the exact metric name prefix. + If metrics_use_pattern_match is true, provide regex patterns to match. version_added: 2.6.0 type: string - example: ~ + example: "\"scheduler,executor,dagrun\" or \"^scheduler,^executor,heartbeat|timeout\"" default: "" metrics_block_list: description: | - If you want to avoid emitting all the available metrics, you can configure a - block list of prefixes (comma separated) to filter out metrics that start with - the elements of the list (e.g: "scheduler,executor,dagrun"). + Configure a block list (comma separated string) to block certain metrics from being emitted. If metrics_allow_list and metrics_block_list are both configured, metrics_block_list is ignored. + If metrics_use_pattern_match is false, match only the exact metric name prefix. + If metrics_use_pattern_match is true, provide regex patterns to match. version_added: 2.6.0 type: string - example: ~ + example: "\"scheduler,executor,dagrun\" or \"^scheduler,^executor,heartbeat|timeout\"" default: "" statsd_on: description: | diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py index 2668f1ec97..6e88975f1b 100644 --- a/airflow/metrics/otel_logger.py +++ b/airflow/metrics/otel_logger.py @@ -35,6 +35,8 @@ from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( OTEL_NAME_MAX_LENGTH, AllowListValidator, + ListValidator, + get_validator, stat_name_otel_handler, ) @@ -166,11 +168,11 @@ class SafeOtelLogger: self, otel_provider, prefix: str = DEFAULT_METRIC_NAME_PREFIX, - allow_list_validator=AllowListValidator(), + metrics_validator: ListValidator = AllowListValidator(), ): self.otel: Callable = otel_provider self.prefix: str = prefix - self.metrics_validator = allow_list_validator + self.metrics_validator = metrics_validator self.meter = otel_provider.get_meter(__name__) self.metrics_map = MetricsMap(self.meter) @@ -393,9 +395,6 @@ def get_otel_logger(cls) -> SafeOtelLogger: interval = conf.getint("metrics", "otel_interval_milliseconds", fallback=None) # ex: 30000 debug = conf.getboolean("metrics", "otel_debugging_on") - allow_list = conf.get("metrics", "metrics_allow_list", fallback=None) - allow_list_validator = AllowListValidator(allow_list) - resource = Resource(attributes={SERVICE_NAME: "Airflow"}) protocol = "https" if ssl_active else "http" @@ -424,4 +423,4 @@ def get_otel_logger(cls) -> SafeOtelLogger: ), ) - return SafeOtelLogger(metrics.get_meter_provider(), prefix, allow_list_validator) + return SafeOtelLogger(metrics.get_meter_provider(), prefix, get_validator()) diff --git a/airflow/metrics/statsd_logger.py b/airflow/metrics/statsd_logger.py index 5e399818ab..5b04c0b95a 100644 --- a/airflow/metrics/statsd_logger.py +++ b/airflow/metrics/statsd_logger.py @@ -27,6 +27,7 @@ from airflow.metrics.protocols import Timer from airflow.metrics.validators import ( AllowListValidator, BlockListValidator, + get_validator, validate_stat, ) @@ -160,8 +161,6 @@ def get_statsd_logger(cls) -> SafeStatsdLogger: from statsd import StatsClient stats_class = conf.getimport("metrics", "statsd_custom_client_path", fallback=None) - metrics_validator: ListValidator - if stats_class: if not issubclass(stats_class, StatsClient): raise AirflowConfigException( @@ -179,17 +178,7 @@ def get_statsd_logger(cls) -> SafeStatsdLogger: port=conf.getint("metrics", "statsd_port"), prefix=conf.get("metrics", "statsd_prefix"), ) - if conf.get("metrics", "metrics_allow_list", fallback=None): - metrics_validator = AllowListValidator(conf.get("metrics", "metrics_allow_list")) - if conf.get("metrics", "metrics_block_list", fallback=None): - log.warning( - "Ignoring metrics_block_list as both metrics_allow_list " - "and metrics_block_list have been set" - ) - elif conf.get("metrics", "metrics_block_list", fallback=None): - metrics_validator = BlockListValidator(conf.get("metrics", "metrics_block_list")) - else: - metrics_validator = AllowListValidator() + influxdb_tags_enabled = conf.getboolean("metrics", "statsd_influxdb_enabled", fallback=False) metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None)) - return SafeStatsdLogger(statsd, metrics_validator, influxdb_tags_enabled, metric_tags_validator) + return SafeStatsdLogger(statsd, get_validator(), influxdb_tags_enabled, metric_tags_validator) diff --git a/airflow/metrics/validators.py b/airflow/metrics/validators.py index 8bd6dd4476..9fbfa6600d 100644 --- a/airflow/metrics/validators.py +++ b/airflow/metrics/validators.py @@ -29,7 +29,7 @@ from typing import Callable, Iterable, Pattern, cast import re2 from airflow.configuration import conf -from airflow.exceptions import InvalidStatsNameException +from airflow.exceptions import InvalidStatsNameException, RemovedInAirflow3Warning log = logging.getLogger(__name__) @@ -85,6 +85,42 @@ BACK_COMPAT_METRIC_NAME_PATTERNS: set[str] = { BACK_COMPAT_METRIC_NAMES: set[Pattern[str]] = {re2.compile(name) for name in BACK_COMPAT_METRIC_NAME_PATTERNS} OTEL_NAME_MAX_LENGTH = 63 +DEFAULT_VALIDATOR_TYPE = "allow" + + +def get_validator() -> ListValidator: + validators = { + "basic": {"allow": AllowListValidator, "block": BlockListValidator}, + "pattern": {"allow": PatternAllowListValidator, "block": PatternBlockListValidator}, + } + metric_lists = { + "allow": (metric_allow_list := conf.get("metrics", "metrics_allow_list", fallback=None)), + "block": (metric_block_list := conf.get("metrics", "metrics_block_list", fallback=None)), + } + + use_pattern = conf.getboolean("metrics", "metrics_use_pattern_match", fallback=False) + validator_type = "pattern" if use_pattern else "basic" + + if not use_pattern: + warnings.warn( + "The basic metric validator will be deprecated in the future in favor of pattern-matching. " + "You can try this now by setting config option metrics_use_pattern_match to True.", + RemovedInAirflow3Warning, + stacklevel=2, + ) + + if metric_allow_list: + list_type = "allow" + if metric_block_list: + log.warning( + "Ignoring metrics_block_list as both metrics_allow_list and metrics_block_list have been set." + ) + elif metric_block_list: + list_type = "block" + else: + list_type = DEFAULT_VALIDATOR_TYPE + + return validators[validator_type][list_type](metric_lists[list_type]) def validate_stat(fn: Callable) -> Callable: @@ -221,6 +257,12 @@ class ListValidator(metaclass=abc.ABCMeta): """Test if name is allowed.""" raise NotImplementedError + def _has_pattern_match(self, name: str) -> bool: + for entry in self.validate_list or (): + if re2.findall(entry, name.strip().lower()): + return True + return False + class AllowListValidator(ListValidator): """AllowListValidator only allows names that match the allowed prefixes.""" @@ -232,6 +274,16 @@ class AllowListValidator(ListValidator): return True # default is all metrics are allowed +class PatternAllowListValidator(ListValidator): + """Match the provided strings anywhere in the metric name.""" + + def test(self, name: str) -> bool: + if self.validate_list is not None: + return super()._has_pattern_match(name) + else: + return True # default is all metrics are allowed + + class BlockListValidator(ListValidator): """BlockListValidator only allows names that do not match the blocked prefixes.""" @@ -240,3 +292,13 @@ class BlockListValidator(ListValidator): return not name.strip().lower().startswith(self.validate_list) else: return True # default is all metrics are allowed + + +class PatternBlockListValidator(ListValidator): + """Only allow names that do not match the blocked strings.""" + + def test(self, name: str) -> bool: + if self.validate_list is not None: + return not super()._has_pattern_match(name) + else: + return True # default is all metrics are allowed diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py index 49ea0db800..31e4559600 100644 --- a/tests/core/test_stats.py +++ b/tests/core/test_stats.py @@ -18,6 +18,7 @@ from __future__ import annotations import importlib +import logging import re from unittest import mock from unittest.mock import Mock @@ -29,7 +30,12 @@ import airflow from airflow.exceptions import AirflowConfigException, InvalidStatsNameException from airflow.metrics.datadog_logger import SafeDogStatsdLogger from airflow.metrics.statsd_logger import SafeStatsdLogger -from airflow.metrics.validators import AllowListValidator, BlockListValidator +from airflow.metrics.validators import ( + AllowListValidator, + BlockListValidator, + PatternAllowListValidator, + PatternBlockListValidator, +) from tests.test_utils.config import conf_vars @@ -265,40 +271,128 @@ class TestDogStats: importlib.reload(airflow.stats) -class TestStatsWithAllowList: - def setup_method(self): - self.statsd_client = Mock(spec=statsd.StatsClient) - self.stats = SafeStatsdLogger(self.statsd_client, AllowListValidator("stats_one, stats_two")) +class TestStatsAllowAndBlockLists: + @pytest.mark.parametrize( + "validator, stat_name, expect_incr", + [ + (PatternAllowListValidator, "stats_one", True), + (PatternAllowListValidator, "stats_two.bla", True), + (PatternAllowListValidator, "stats_three.foo", True), + (PatternAllowListValidator, "stats_foo_three", True), + (PatternAllowListValidator, "stats_three", False), + (AllowListValidator, "stats_one", True), + (AllowListValidator, "stats_two.bla", True), + (AllowListValidator, "stats_three.foo", False), + (AllowListValidator, "stats_foo_three", False), + (AllowListValidator, "stats_three", False), + (PatternBlockListValidator, "stats_one", False), + (PatternBlockListValidator, "stats_two.bla", False), + (PatternBlockListValidator, "stats_three.foo", False), + (PatternBlockListValidator, "stats_foo_three", False), + (PatternBlockListValidator, "stats_foo", False), + (PatternBlockListValidator, "stats_three", True), + (BlockListValidator, "stats_one", False), + (BlockListValidator, "stats_two.bla", False), + (BlockListValidator, "stats_three.foo", True), + (BlockListValidator, "stats_foo_three", True), + (BlockListValidator, "stats_three", True), + ], + ) + def test_allow_and_block_list(self, validator, stat_name, expect_incr): + statsd_client = Mock(spec=statsd.StatsClient) + stats = SafeStatsdLogger(statsd_client, validator("stats_one, stats_two, foo")) + + stats.incr(stat_name) + + if expect_incr: + statsd_client.incr.assert_called_once_with(stat_name, 1, 1) + else: + statsd_client.assert_not_called() + + @pytest.mark.parametrize( + "match_pattern, expect_incr", + [ + ("^stat", True), + ("a.{4}o", True), + ("^banana", False), + ], + ) + def test_regex_matches(self, match_pattern, expect_incr): + stat_name = "stats_foo_one" + validator = PatternAllowListValidator - def test_increment_counter_with_allowed_key(self): - self.stats.incr("stats_one") - self.statsd_client.incr.assert_called_once_with("stats_one", 1, 1) + statsd_client = Mock(spec=statsd.StatsClient) + stats = SafeStatsdLogger(statsd_client, validator(match_pattern)) - def test_increment_counter_with_allowed_prefix(self): - self.stats.incr("stats_two.bla") - self.statsd_client.incr.assert_called_once_with("stats_two.bla", 1, 1) + stats.incr(stat_name) - def test_not_increment_counter_if_not_allowed(self): - self.stats.incr("stats_three") - self.statsd_client.assert_not_called() + if expect_incr: + statsd_client.incr.assert_called_once_with(stat_name, 1, 1) + else: + statsd_client.assert_not_called() -class TestStatsWithBlockList: - def setup_method(self): - self.statsd_client = Mock(spec=statsd.StatsClient) - self.stats = SafeStatsdLogger(self.statsd_client, BlockListValidator("stats_one, stats_two")) +class TestPatternOrBasicValidatorConfigOption: + def teardown_method(self): + # Avoid side-effects + importlib.reload(airflow.stats) - def test_increment_counter_with_allowed_key(self): - self.stats.incr("stats_one") - self.statsd_client.assert_not_called() + stats_on = {("metrics", "statsd_on"): "True"} + pattern_on = {("metrics", "metrics_use_pattern_match"): "True"} + pattern_off = {("metrics", "metrics_use_pattern_match"): "False"} + allow_list = {("metrics", "metrics_allow_list"): "foo,bar"} + block_list = {("metrics", "metrics_block_list"): "foo,bar"} + + @pytest.mark.parametrize( + "config, expected", + [ + pytest.param( + {**stats_on, **pattern_on}, + PatternAllowListValidator, + id="pattern_allow_by_default", + ), + pytest.param( + stats_on, + AllowListValidator, + id="basic_allow_by_default", + ), + pytest.param( + {**stats_on, **pattern_on, **allow_list}, + PatternAllowListValidator, + id="pattern_allow_list_provided", + ), + pytest.param( + {**stats_on, **pattern_off, **allow_list}, + AllowListValidator, + id="basic_allow_list_provided", + ), + pytest.param( + {**stats_on, **pattern_on, **block_list}, + PatternBlockListValidator, + id="pattern_block_list_provided", + ), + pytest.param( + {**stats_on, **block_list}, + BlockListValidator, + id="basic_block_list_provided", + ), + ], + ) + def test_pattern_or_basic_picker(self, config, expected): + with conf_vars(config): + importlib.reload(airflow.stats) - def test_increment_counter_with_allowed_prefix(self): - self.stats.incr("stats_two.bla") - self.statsd_client.assert_not_called() + assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient) + assert type(airflow.stats.Stats.instance.metrics_validator) == expected - def test_not_increment_counter_if_not_allowed(self): - self.stats.incr("stats_three") - self.statsd_client.incr.assert_called_once_with("stats_three", 1, 1) + @conf_vars({**stats_on, **block_list, ("metrics", "metrics_allow_list"): "bax,qux"}) + def test_setting_allow_and_block_logs_warning(self, caplog): + importlib.reload(airflow.stats) + + assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient) + assert type(airflow.stats.Stats.instance.metrics_validator) == AllowListValidator + with caplog.at_level(logging.WARNING): + assert "Ignoring metrics_block_list" in caplog.text class TestDogStatsWithAllowList: