amoghrajesh commented on code in PR #62127:
URL: https://github.com/apache/airflow/pull/62127#discussion_r2827465093


##########
task-sdk/src/airflow/sdk/observability/metrics/statsd_logger.py:
##########
@@ -29,7 +29,7 @@
 log = logging.getLogger(__name__)
 
 
-def get_statsd_logger(cls) -> SafeStatsdLogger:
+def get_statsd_logger() -> SafeStatsdLogger:

Review Comment:
   Corresponding change in core too?



##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -40,6 +40,7 @@
 
 from airflow.dag_processing.bundles.base import BaseDagBundle, 
BundleVersionLock
 from airflow.dag_processing.bundles.manager import DagBundlesManager
+from airflow.observability.metrics import stats_utils

Review Comment:
   ```suggestion
   from airflow.sdk.observability.metrics import stats_utils
   ```



##########
shared/observability/tests/observability/metrics/test_stats.py:
##########
@@ -102,127 +92,98 @@ def test_decr(self):
 
     def test_enabled_by_config(self):
         """Test that enabling this sets the right instance properties"""
-        with conf_vars({("metrics", "statsd_on"): "True"}):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=lambda: statsd_logger.get_statsd_logger(
+                stats_class=statsd.StatsClient,
+                host="localhost",
+                port="1234",
+                prefix="airflow",
             )
-            assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
statsd.StatsClient)
-            assert not 
hasattr(airflow_shared.observability.metrics.stats.Stats, "dogstatsd")
+        )
+        assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
statsd.StatsClient)
+        assert not hasattr(airflow_shared.observability.metrics.stats.Stats, 
"dogstatsd")
         # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
     def test_load_custom_statsd_client(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "statsd_custom_client_path"): 
f"{__name__}.CustomStatsd",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
CustomStatsd)
-        # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
-
-    def test_load_invalid_custom_stats_client(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "statsd_custom_client_path"): 
f"{__name__}.InvalidCustomStatsd",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            error_message = re.escape(
-                "Your custom StatsD client must extend the statsd."
-                "StatsClient in order to ensure backwards compatibility."
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=lambda: statsd_logger.get_statsd_logger(
+                stats_class=CustomStatsd,
+                host="localhost",
+                port="1234",
+                prefix="airflow",
             )
-            # we assert for Exception here instead of AirflowConfigException 
to not import from shared configuration
-            with pytest.raises(Exception, match=error_message):
-                
airflow_shared.observability.metrics.stats.Stats.incr("empty_key")
+        )
+        assert 
isinstance(airflow_shared.observability.metrics.stats.Stats.statsd, 
CustomStatsd)
+        # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
     def test_load_allow_list_validator(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "metrics_allow_list"): "name1,name2",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert isinstance(
-                
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
-                PatternAllowListValidator,
-            )
-            assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
-                "name1",
-                "name2",
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=lambda: statsd_logger.get_statsd_logger(
+                stats_class=statsd.StatsClient,
+                host="localhost",
+                port="1234",
+                prefix="airflow",
+                metrics_allow_list="name1,name2",
             )
+        )
+        assert isinstance(
+            airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+            PatternAllowListValidator,
+        )
+        assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
+            "name1",
+            "name2",
+        )
         # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
     def test_load_block_list_validator(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "metrics_block_list"): "name1,name2",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert isinstance(
-                
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
-                PatternBlockListValidator,
-            )
-            assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
-                "name1",
-                "name2",
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=lambda: statsd_logger.get_statsd_logger(
+                stats_class=statsd.StatsClient,
+                host="localhost",
+                port="1234",
+                prefix="airflow",
+                metrics_block_list="name1,name2",
             )
+        )
+        assert isinstance(
+            airflow_shared.observability.metrics.stats.Stats.metrics_validator,
+            PatternBlockListValidator,
+        )
+        assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
+            "name1",
+            "name2",
+        )
         # Avoid side-effects
         importlib.reload(airflow_shared.observability.metrics.stats)
 
     def 
test_load_allow_and_block_list_validator_loads_only_allow_list_validator(self):
-        with conf_vars(
-            {
-                ("metrics", "statsd_on"): "True",
-                ("metrics", "metrics_allow_list"): "name1,name2",
-                ("metrics", "metrics_block_list"): "name1,name2",
-            }
-        ):
-            importlib.reload(airflow_shared.observability.metrics.stats)
-            airflow_shared.observability.metrics.stats.Stats.initialize(
-                is_statsd_datadog_enabled=False,
-                is_statsd_on=True,
-                is_otel_on=False,
-            )
-            assert isinstance(
-                
airflow_shared.observability.metrics.stats.Stats.metrics_validator,
-                PatternAllowListValidator,
-            )
-            assert 
airflow_shared.observability.metrics.stats.Stats.metrics_validator.validate_list
 == (
-                "name1",
-                "name2",
+        importlib.reload(airflow_shared.observability.metrics.stats)
+        airflow_shared.observability.metrics.stats.Stats.initialize(
+            factory=lambda: statsd_logger.get_statsd_logger(
+                stats_class=statsd.StatsClient,
+                host="localhost",
+                port="1234",
+                prefix="airflow",
+                metrics_allow_list="name1,name2",
+                metrics_block_list="name1,name2",
             )
+        )

Review Comment:
   Since we repeat this bit a lot, extract a helper test method for this?



##########
task-sdk/src/airflow/sdk/observability/metrics/otel_logger.py:
##########
@@ -25,7 +25,7 @@
     from airflow.sdk._shared.observability.metrics.otel_logger import 
SafeOtelLogger
 
 
-def get_otel_logger(cls) -> SafeOtelLogger:
+def get_otel_logger() -> SafeOtelLogger:

Review Comment:
   Corresponding change in core too?



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -271,11 +272,8 @@ def run(self):
         # Related: https://github.com/apache/airflow/pull/57459
         os.environ["_AIRFLOW_PROCESS_CONTEXT"] = "server"
 
-        Stats.initialize(
-            is_statsd_datadog_enabled=conf.getboolean("metrics", 
"statsd_datadog_enabled"),
-            is_statsd_on=conf.getboolean("metrics", "statsd_on"),
-            is_otel_on=conf.getboolean("metrics", "otel_on"),
-        )
+        stats_factory = stats_utils.get_stats_factory(Stats)
+        Stats.initialize(factory=stats_factory)

Review Comment:
   Might not be a breaking change since it hasn't been released with 3.2 yet 
yes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to