This is an automated email from the ASF dual-hosted git repository.

ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 481feb32a8 Add a fuzzy/regex pattern-matching for metric allow and 
block list (#36250)
481feb32a8 is described below

commit 481feb32a835277bbd26715c73c2bb85b25b99ad
Author: D. Ferruzzi <ferru...@amazon.com>
AuthorDate: Fri Jan 12 16:19:39 2024 -0800

    Add a fuzzy/regex pattern-matching for metric allow and block list (#36250)
    
    * Add a fuzzy match for metric allow and block list
    
    * Add deprecation notice and regex pattern unit tests
    
    * rename "fuzzy matching" to "pattern matching"
    
    * Improve pattern-matching test
    
    * clarify docstring phrasing
    
    Co-authored-by: Niko Oliveira <oniko...@amazon.com>
    
    * line length issue
    
    * rephrased docs
    
    * hussein changes
    
    ---------
    
    Co-authored-by: Niko Oliveira <oniko...@amazon.com>
---
 airflow/config_templates/config.yml |  24 ++++--
 airflow/metrics/otel_logger.py      |  11 ++-
 airflow/metrics/statsd_logger.py    |  17 +----
 airflow/metrics/validators.py       |  64 +++++++++++++++-
 tests/core/test_stats.py            | 148 +++++++++++++++++++++++++++++-------
 5 files changed, 208 insertions(+), 56 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index 1788e00593..79932a876a 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -944,24 +944,32 @@ metrics:
   description: |
     StatsD (https://github.com/etsy/statsd) integration settings.
   options:
+    metrics_use_pattern_match:
+      description: |
+        If true, metrics_allow_list and metrics_block_list will use regex 
pattern matching
+        anywhere within the metric name instead of only prefix matching at the 
start of the name.
+      version_added: 2.9.0
+      type: boolean
+      example: ~
+      default: "False"
     metrics_allow_list:
       description: |
-        If you want to avoid emitting all the available metrics, you can 
configure an
-        allow list of prefixes (comma separated) to send only the metrics that 
start
-        with the elements of the list (e.g: "scheduler,executor,dagrun")
+        Configure an allow list (comma separated string) to send only certain 
metrics.
+        If metrics_use_pattern_match is false, match only the exact metric 
name prefix.
+        If metrics_use_pattern_match is true, provide regex patterns to match.
       version_added: 2.6.0
       type: string
-      example: ~
+      example: "\"scheduler,executor,dagrun\" or 
\"^scheduler,^executor,heartbeat|timeout\""
       default: ""
     metrics_block_list:
       description: |
-        If you want to avoid emitting all the available metrics, you can 
configure a
-        block list of prefixes (comma separated) to filter out metrics that 
start with
-        the elements of the list (e.g: "scheduler,executor,dagrun").
+        Configure a block list (comma separated string) to block certain 
metrics from being emitted.
         If metrics_allow_list and metrics_block_list are both configured, 
metrics_block_list is ignored.
+        If metrics_use_pattern_match is false, match only the exact metric 
name prefix.
+        If metrics_use_pattern_match is true, provide regex patterns to match.
       version_added: 2.6.0
       type: string
-      example: ~
+      example: "\"scheduler,executor,dagrun\" or 
\"^scheduler,^executor,heartbeat|timeout\""
       default: ""
     statsd_on:
       description: |
diff --git a/airflow/metrics/otel_logger.py b/airflow/metrics/otel_logger.py
index 2668f1ec97..6e88975f1b 100644
--- a/airflow/metrics/otel_logger.py
+++ b/airflow/metrics/otel_logger.py
@@ -35,6 +35,8 @@ from airflow.metrics.protocols import Timer
 from airflow.metrics.validators import (
     OTEL_NAME_MAX_LENGTH,
     AllowListValidator,
+    ListValidator,
+    get_validator,
     stat_name_otel_handler,
 )
 
@@ -166,11 +168,11 @@ class SafeOtelLogger:
         self,
         otel_provider,
         prefix: str = DEFAULT_METRIC_NAME_PREFIX,
-        allow_list_validator=AllowListValidator(),
+        metrics_validator: ListValidator = AllowListValidator(),
     ):
         self.otel: Callable = otel_provider
         self.prefix: str = prefix
-        self.metrics_validator = allow_list_validator
+        self.metrics_validator = metrics_validator
         self.meter = otel_provider.get_meter(__name__)
         self.metrics_map = MetricsMap(self.meter)
 
@@ -393,9 +395,6 @@ def get_otel_logger(cls) -> SafeOtelLogger:
     interval = conf.getint("metrics", "otel_interval_milliseconds", 
fallback=None)  # ex: 30000
     debug = conf.getboolean("metrics", "otel_debugging_on")
 
-    allow_list = conf.get("metrics", "metrics_allow_list", fallback=None)
-    allow_list_validator = AllowListValidator(allow_list)
-
     resource = Resource(attributes={SERVICE_NAME: "Airflow"})
 
     protocol = "https" if ssl_active else "http"
@@ -424,4 +423,4 @@ def get_otel_logger(cls) -> SafeOtelLogger:
         ),
     )
 
-    return SafeOtelLogger(metrics.get_meter_provider(), prefix, 
allow_list_validator)
+    return SafeOtelLogger(metrics.get_meter_provider(), prefix, 
get_validator())
diff --git a/airflow/metrics/statsd_logger.py b/airflow/metrics/statsd_logger.py
index 5e399818ab..5b04c0b95a 100644
--- a/airflow/metrics/statsd_logger.py
+++ b/airflow/metrics/statsd_logger.py
@@ -27,6 +27,7 @@ from airflow.metrics.protocols import Timer
 from airflow.metrics.validators import (
     AllowListValidator,
     BlockListValidator,
+    get_validator,
     validate_stat,
 )
 
@@ -160,8 +161,6 @@ def get_statsd_logger(cls) -> SafeStatsdLogger:
     from statsd import StatsClient
 
     stats_class = conf.getimport("metrics", "statsd_custom_client_path", 
fallback=None)
-    metrics_validator: ListValidator
-
     if stats_class:
         if not issubclass(stats_class, StatsClient):
             raise AirflowConfigException(
@@ -179,17 +178,7 @@ def get_statsd_logger(cls) -> SafeStatsdLogger:
         port=conf.getint("metrics", "statsd_port"),
         prefix=conf.get("metrics", "statsd_prefix"),
     )
-    if conf.get("metrics", "metrics_allow_list", fallback=None):
-        metrics_validator = AllowListValidator(conf.get("metrics", 
"metrics_allow_list"))
-        if conf.get("metrics", "metrics_block_list", fallback=None):
-            log.warning(
-                "Ignoring metrics_block_list as both metrics_allow_list "
-                "and metrics_block_list have been set"
-            )
-    elif conf.get("metrics", "metrics_block_list", fallback=None):
-        metrics_validator = BlockListValidator(conf.get("metrics", 
"metrics_block_list"))
-    else:
-        metrics_validator = AllowListValidator()
+
     influxdb_tags_enabled = conf.getboolean("metrics", 
"statsd_influxdb_enabled", fallback=False)
     metric_tags_validator = BlockListValidator(conf.get("metrics", 
"statsd_disabled_tags", fallback=None))
-    return SafeStatsdLogger(statsd, metrics_validator, influxdb_tags_enabled, 
metric_tags_validator)
+    return SafeStatsdLogger(statsd, get_validator(), influxdb_tags_enabled, 
metric_tags_validator)
diff --git a/airflow/metrics/validators.py b/airflow/metrics/validators.py
index 8bd6dd4476..9fbfa6600d 100644
--- a/airflow/metrics/validators.py
+++ b/airflow/metrics/validators.py
@@ -29,7 +29,7 @@ from typing import Callable, Iterable, Pattern, cast
 import re2
 
 from airflow.configuration import conf
-from airflow.exceptions import InvalidStatsNameException
+from airflow.exceptions import InvalidStatsNameException, 
RemovedInAirflow3Warning
 
 log = logging.getLogger(__name__)
 
@@ -85,6 +85,42 @@ BACK_COMPAT_METRIC_NAME_PATTERNS: set[str] = {
 BACK_COMPAT_METRIC_NAMES: set[Pattern[str]] = {re2.compile(name) for name in 
BACK_COMPAT_METRIC_NAME_PATTERNS}
 
 OTEL_NAME_MAX_LENGTH = 63
+DEFAULT_VALIDATOR_TYPE = "allow"
+
+
+def get_validator() -> ListValidator:
+    validators = {
+        "basic": {"allow": AllowListValidator, "block": BlockListValidator},
+        "pattern": {"allow": PatternAllowListValidator, "block": 
PatternBlockListValidator},
+    }
+    metric_lists = {
+        "allow": (metric_allow_list := conf.get("metrics", 
"metrics_allow_list", fallback=None)),
+        "block": (metric_block_list := conf.get("metrics", 
"metrics_block_list", fallback=None)),
+    }
+
+    use_pattern = conf.getboolean("metrics", "metrics_use_pattern_match", 
fallback=False)
+    validator_type = "pattern" if use_pattern else "basic"
+
+    if not use_pattern:
+        warnings.warn(
+            "The basic metric validator will be deprecated in the future in 
favor of pattern-matching.  "
+            "You can try this now by setting config option 
metrics_use_pattern_match to True.",
+            RemovedInAirflow3Warning,
+            stacklevel=2,
+        )
+
+    if metric_allow_list:
+        list_type = "allow"
+        if metric_block_list:
+            log.warning(
+                "Ignoring metrics_block_list as both metrics_allow_list and 
metrics_block_list have been set."
+            )
+    elif metric_block_list:
+        list_type = "block"
+    else:
+        list_type = DEFAULT_VALIDATOR_TYPE
+
+    return validators[validator_type][list_type](metric_lists[list_type])
 
 
 def validate_stat(fn: Callable) -> Callable:
@@ -221,6 +257,12 @@ class ListValidator(metaclass=abc.ABCMeta):
         """Test if name is allowed."""
         raise NotImplementedError
 
+    def _has_pattern_match(self, name: str) -> bool:
+        for entry in self.validate_list or ():
+            if re2.findall(entry, name.strip().lower()):
+                return True
+        return False
+
 
 class AllowListValidator(ListValidator):
     """AllowListValidator only allows names that match the allowed prefixes."""
@@ -232,6 +274,16 @@ class AllowListValidator(ListValidator):
             return True  # default is all metrics are allowed
 
 
+class PatternAllowListValidator(ListValidator):
+    """Match the provided strings anywhere in the metric name."""
+
+    def test(self, name: str) -> bool:
+        if self.validate_list is not None:
+            return super()._has_pattern_match(name)
+        else:
+            return True  # default is all metrics are allowed
+
+
 class BlockListValidator(ListValidator):
     """BlockListValidator only allows names that do not match the blocked 
prefixes."""
 
@@ -240,3 +292,13 @@ class BlockListValidator(ListValidator):
             return not name.strip().lower().startswith(self.validate_list)
         else:
             return True  # default is all metrics are allowed
+
+
+class PatternBlockListValidator(ListValidator):
+    """Only allow names that do not match the blocked strings."""
+
+    def test(self, name: str) -> bool:
+        if self.validate_list is not None:
+            return not super()._has_pattern_match(name)
+        else:
+            return True  # default is all metrics are allowed
diff --git a/tests/core/test_stats.py b/tests/core/test_stats.py
index 49ea0db800..31e4559600 100644
--- a/tests/core/test_stats.py
+++ b/tests/core/test_stats.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import importlib
+import logging
 import re
 from unittest import mock
 from unittest.mock import Mock
@@ -29,7 +30,12 @@ import airflow
 from airflow.exceptions import AirflowConfigException, 
InvalidStatsNameException
 from airflow.metrics.datadog_logger import SafeDogStatsdLogger
 from airflow.metrics.statsd_logger import SafeStatsdLogger
-from airflow.metrics.validators import AllowListValidator, BlockListValidator
+from airflow.metrics.validators import (
+    AllowListValidator,
+    BlockListValidator,
+    PatternAllowListValidator,
+    PatternBlockListValidator,
+)
 from tests.test_utils.config import conf_vars
 
 
@@ -265,40 +271,128 @@ class TestDogStats:
         importlib.reload(airflow.stats)
 
 
-class TestStatsWithAllowList:
-    def setup_method(self):
-        self.statsd_client = Mock(spec=statsd.StatsClient)
-        self.stats = SafeStatsdLogger(self.statsd_client, 
AllowListValidator("stats_one, stats_two"))
+class TestStatsAllowAndBlockLists:
+    @pytest.mark.parametrize(
+        "validator, stat_name, expect_incr",
+        [
+            (PatternAllowListValidator, "stats_one", True),
+            (PatternAllowListValidator, "stats_two.bla", True),
+            (PatternAllowListValidator, "stats_three.foo", True),
+            (PatternAllowListValidator, "stats_foo_three", True),
+            (PatternAllowListValidator, "stats_three", False),
+            (AllowListValidator, "stats_one", True),
+            (AllowListValidator, "stats_two.bla", True),
+            (AllowListValidator, "stats_three.foo", False),
+            (AllowListValidator, "stats_foo_three", False),
+            (AllowListValidator, "stats_three", False),
+            (PatternBlockListValidator, "stats_one", False),
+            (PatternBlockListValidator, "stats_two.bla", False),
+            (PatternBlockListValidator, "stats_three.foo", False),
+            (PatternBlockListValidator, "stats_foo_three", False),
+            (PatternBlockListValidator, "stats_foo", False),
+            (PatternBlockListValidator, "stats_three", True),
+            (BlockListValidator, "stats_one", False),
+            (BlockListValidator, "stats_two.bla", False),
+            (BlockListValidator, "stats_three.foo", True),
+            (BlockListValidator, "stats_foo_three", True),
+            (BlockListValidator, "stats_three", True),
+        ],
+    )
+    def test_allow_and_block_list(self, validator, stat_name, expect_incr):
+        statsd_client = Mock(spec=statsd.StatsClient)
+        stats = SafeStatsdLogger(statsd_client, validator("stats_one, 
stats_two, foo"))
+
+        stats.incr(stat_name)
+
+        if expect_incr:
+            statsd_client.incr.assert_called_once_with(stat_name, 1, 1)
+        else:
+            statsd_client.assert_not_called()
+
+    @pytest.mark.parametrize(
+        "match_pattern, expect_incr",
+        [
+            ("^stat", True),
+            ("a.{4}o", True),
+            ("^banana", False),
+        ],
+    )
+    def test_regex_matches(self, match_pattern, expect_incr):
+        stat_name = "stats_foo_one"
+        validator = PatternAllowListValidator
 
-    def test_increment_counter_with_allowed_key(self):
-        self.stats.incr("stats_one")
-        self.statsd_client.incr.assert_called_once_with("stats_one", 1, 1)
+        statsd_client = Mock(spec=statsd.StatsClient)
+        stats = SafeStatsdLogger(statsd_client, validator(match_pattern))
 
-    def test_increment_counter_with_allowed_prefix(self):
-        self.stats.incr("stats_two.bla")
-        self.statsd_client.incr.assert_called_once_with("stats_two.bla", 1, 1)
+        stats.incr(stat_name)
 
-    def test_not_increment_counter_if_not_allowed(self):
-        self.stats.incr("stats_three")
-        self.statsd_client.assert_not_called()
+        if expect_incr:
+            statsd_client.incr.assert_called_once_with(stat_name, 1, 1)
+        else:
+            statsd_client.assert_not_called()
 
 
-class TestStatsWithBlockList:
-    def setup_method(self):
-        self.statsd_client = Mock(spec=statsd.StatsClient)
-        self.stats = SafeStatsdLogger(self.statsd_client, 
BlockListValidator("stats_one, stats_two"))
+class TestPatternOrBasicValidatorConfigOption:
+    def teardown_method(self):
+        # Avoid side-effects
+        importlib.reload(airflow.stats)
 
-    def test_increment_counter_with_allowed_key(self):
-        self.stats.incr("stats_one")
-        self.statsd_client.assert_not_called()
+    stats_on = {("metrics", "statsd_on"): "True"}
+    pattern_on = {("metrics", "metrics_use_pattern_match"): "True"}
+    pattern_off = {("metrics", "metrics_use_pattern_match"): "False"}
+    allow_list = {("metrics", "metrics_allow_list"): "foo,bar"}
+    block_list = {("metrics", "metrics_block_list"): "foo,bar"}
+
+    @pytest.mark.parametrize(
+        "config, expected",
+        [
+            pytest.param(
+                {**stats_on, **pattern_on},
+                PatternAllowListValidator,
+                id="pattern_allow_by_default",
+            ),
+            pytest.param(
+                stats_on,
+                AllowListValidator,
+                id="basic_allow_by_default",
+            ),
+            pytest.param(
+                {**stats_on, **pattern_on, **allow_list},
+                PatternAllowListValidator,
+                id="pattern_allow_list_provided",
+            ),
+            pytest.param(
+                {**stats_on, **pattern_off, **allow_list},
+                AllowListValidator,
+                id="basic_allow_list_provided",
+            ),
+            pytest.param(
+                {**stats_on, **pattern_on, **block_list},
+                PatternBlockListValidator,
+                id="pattern_block_list_provided",
+            ),
+            pytest.param(
+                {**stats_on, **block_list},
+                BlockListValidator,
+                id="basic_block_list_provided",
+            ),
+        ],
+    )
+    def test_pattern_or_basic_picker(self, config, expected):
+        with conf_vars(config):
+            importlib.reload(airflow.stats)
 
-    def test_increment_counter_with_allowed_prefix(self):
-        self.stats.incr("stats_two.bla")
-        self.statsd_client.assert_not_called()
+            assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient)
+            assert type(airflow.stats.Stats.instance.metrics_validator) == 
expected
 
-    def test_not_increment_counter_if_not_allowed(self):
-        self.stats.incr("stats_three")
-        self.statsd_client.incr.assert_called_once_with("stats_three", 1, 1)
+    @conf_vars({**stats_on, **block_list, ("metrics", "metrics_allow_list"): 
"bax,qux"})
+    def test_setting_allow_and_block_logs_warning(self, caplog):
+        importlib.reload(airflow.stats)
+
+        assert isinstance(airflow.stats.Stats.statsd, statsd.StatsClient)
+        assert type(airflow.stats.Stats.instance.metrics_validator) == 
AllowListValidator
+        with caplog.at_level(logging.WARNING):
+            assert "Ignoring metrics_block_list" in caplog.text
 
 
 class TestDogStatsWithAllowList:

Reply via email to