amoghrajesh commented on code in PR #62127:
URL: https://github.com/apache/airflow/pull/62127#discussion_r2827465093
##########
task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py:
##########
@@ -29,7 +29,7 @@
log = logging.getLogger(__name__)
-def get_statsd_logger(cls) -> SafeStatsdLogger:
+def get_statsd_logger() -> SafeStatsdLogger:
Review Comment:
Corresponding change in core too?
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -40,6 +40,7 @@
from airflow.dag_processing.bundles.base import BaseDagBundle,
BundleVersionLock
from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.observability.metrics import stats_utils
Review Comment:
```suggestion
from airflow.sdk.observability.metrics import stats_utils
```
##########
shared/observability/tests/observability/metrics/test_stats.py:
##########
@@ -102,127 +92,98 @@ def test_decr(self):
def test_enabled_by_config(self):
"""Test that enabling this sets the right instance properties"""
- with conf_vars({("metrics", "statsd_on"): "True"}):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=lambda: statsd_logger.get_statsd_logger(
+ stats_class=statsd.StatsClient,
+ host="localhost",
+ port="1234",
+ prefix="airflow",
)
- assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
statsd.StatsClient)
- assert not
hasattr(airflow_shared.observability.metrics.stats.Stats, "dogstatsd")
+ )
+ assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
statsd.StatsClient)
+ assert not hasattr(airflow_shared.observability.metrics.stats.Stats,
"dogstatsd")
# Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
def test_load_custom_statsd_client(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "statsd_custom_client_path"):
f"{__name__}.CustomStatsd",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
CustomStatsd)
- # Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
-
- def test_load_invalid_custom_stats_client(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "statsd_custom_client_path"):
f"{__name__}.InvalidCustomStatsd",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- error_message = re.escape(
- "Your custom StatsD client must extend the statsd."
- "StatsClient in order to ensure backwards compatibility."
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=lambda: statsd_logger.get_statsd_logger(
+ stats_class=CustomStatsd,
+ host="localhost",
+ port="1234",
+ prefix="airflow",
)
- # we assert for Exception here instead of AirflowConfigException
to not import from shared configuration
- with pytest.raises(Exception, match=error_message):
-
airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
+ )
+ assert
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd,
CustomStatsd)
+ # Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
def test_load_allow_list_validator(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "metrics_allow_list"): "name1,name2",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert isinstance(
-
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
- PatternAllowListValidator,
- )
- assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
- "name1",
- "name2",
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=lambda: statsd_logger.get_statsd_logger(
+ stats_class=statsd.StatsClient,
+ host="localhost",
+ port="1234",
+ prefix="airflow",
+ metrics_allow_list="name1,name2",
)
+ )
+ assert isinstance(
+ airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+ PatternAllowListValidator,
+ )
+ assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
+ "name1",
+ "name2",
+ )
# Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
def test_load_block_list_validator(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "metrics_block_list"): "name1,name2",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert isinstance(
-
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
- PatternBlockListValidator,
- )
- assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
- "name1",
- "name2",
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=lambda: statsd_logger.get_statsd_logger(
+ stats_class=statsd.StatsClient,
+ host="localhost",
+ port="1234",
+ prefix="airflow",
+ metrics_block_list="name1,name2",
)
+ )
+ assert isinstance(
+ airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+ PatternBlockListValidator,
+ )
+ assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
+ "name1",
+ "name2",
+ )
# Avoid side-effects
importlib.reload(airflow_shared.observability.metrics.stats)
def
test_load_allow_and_block_list_validator_loads_only_allow_list_validator(self):
- with conf_vars(
- {
- ("metrics", "statsd_on"): "True",
- ("metrics", "metrics_allow_list"): "name1,name2",
- ("metrics", "metrics_block_list"): "name1,name2",
- }
- ):
- importlib.reload(airflow_shared.observability.metrics.stats)
- airflow_shared.observability.metrics.stats.Stats.initialize(
- is_statsd_datadog_enabled=False,
- is_statsd_on=True,
- is_otel_on=False,
- )
- assert isinstance(
-
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
- PatternAllowListValidator,
- )
- assert
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
== (
- "name1",
- "name2",
+ importlib.reload(airflow_shared.observability.metrics.stats)
+ airflow_shared.observability.metrics.stats.Stats.initialize(
+ factory=lambda: statsd_logger.get_statsd_logger(
+ stats_class=statsd.StatsClient,
+ host="localhost",
+ port="1234",
+ prefix="airflow",
+ metrics_allow_list="name1,name2",
+ metrics_block_list="name1,name2",
)
+ )
Review Comment:
Since we repeat this bit a lot, extract a helper test method for this?
##########
task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py:
##########
@@ -25,7 +25,7 @@
from airflow.sdk._shared.observability.metrics.otel_logger import
SafeOtelLogger
-def get_otel_logger(cls) -> SafeOtelLogger:
+def get_otel_logger() -> SafeOtelLogger:
Review Comment:
Corresponding change in core too?
##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -271,11 +272,8 @@ def run(self):
# Related: https://github.com/apache/airflow/pull/57459
os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
- Stats.initialize(
- is_statsd_datadog_enabled=conf.getboolean("metrics",
"statsd_datadog_enabled"),
- is_statsd_on=conf.getboolean("metrics", "statsd_on"),
- is_otel_on=conf.getboolean("metrics", "otel_on"),
- )
+ stats_factory = stats_utils.get_stats_factory(Stats)
+ Stats.initialize(factory=stats_factory)
Review Comment:
Might not be a breaking change since it hasn't been released with 3.2 yet
yes?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]