This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 38c50f5b54e Explicitly initialize Task SDK Stats in API server 
lifespan (hardening; backport fix for 3.2.x) (#68078)
38c50f5b54e is described below

commit 38c50f5b54ed77e125aa95e3005c7bc8e7ad361e
Author: Diogo Silva <[email protected]>
AuthorDate: Fri Jun 12 19:19:16 2026 +0100

    Explicitly initialize Task SDK Stats in API server lifespan (hardening; 
backport fix for 3.2.x) (#68078)
    
    * Initialize Task SDK Stats in the API server so Edge Worker metrics are 
emitted
    
    The API server serves the Edge Worker REST API (/edge_worker/v1/...) whose
    heartbeat handler records edge_worker.* metrics through the Task SDK Stats
    singleton (resolved by the Edge provider via 
airflow.providers.common.compat).
    
    Unlike the scheduler, triggerer, dag-processor, executors and task runner, 
the
    API server never called Stats.initialize(...). After the auto-initializing 
Stats
    was removed in #63932, that singleton stays a NoStatsLogger in the API 
server
    process and every Edge Worker metric is silently dropped.
    
    Initialize the Task SDK Stats singleton from the FastAPI lifespan (runs 
once per
    worker, post-fork), mirroring the existing init in serde/task_runner. The 
call is
    guarded so a metrics misconfiguration can never block API server startup.
    
    Closes: #68077
    Signed-off-by: Diogo Silva <[email protected]>
    
    * Add newsfragment for #68078
    
    Signed-off-by: Diogo Silva <[email protected]>
    
    * fix: fix ruff errors
    
    * Remove newsfragment — bug fix does not require a user-facing changelog 
entry
    
    * Make API server Stats init log and docstring metric-agnostic
    
    * Update airflow-core/src/airflow/api_fastapi/app.py
    
    Co-authored-by: Jens Scheffler <[email protected]>
    
    * Update airflow-core/tests/unit/api_fastapi/test_app.py
    
    Co-authored-by: Jens Scheffler <[email protected]>
    
    ---------
    
    Signed-off-by: Diogo Silva <[email protected]>
    Co-authored-by: Jens Scheffler <[email protected]>
---
 airflow-core/src/airflow/api_fastapi/app.py     | 24 ++++++++++++++
 airflow-core/tests/unit/api_fastapi/test_app.py | 43 +++++++++++++++++++++++++
 2 files changed, 67 insertions(+)

diff --git a/airflow-core/src/airflow/api_fastapi/app.py 
b/airflow-core/src/airflow/api_fastapi/app.py
index 8931840c880..45b64dc36f7 100644
--- a/airflow-core/src/airflow/api_fastapi/app.py
+++ b/airflow-core/src/airflow/api_fastapi/app.py
@@ -71,8 +71,32 @@ class _AuthManagerState:
     _lock = threading.Lock()
 
 
+def _initialize_task_sdk_stats() -> None:
+    """
+    Initialize the Task SDK ``Stats`` singleton in the API server process.
+
+    Initialization is guarded so a metrics misconfiguration can never prevent 
the API server
+    from starting.
+    """
+    try:
+        from airflow._shared.observability.metrics import stats
+        from airflow.observability.metrics import stats_utils
+
+        stats.initialize(
+            factory=stats_utils.get_stats_factory(),
+            export_legacy_names=conf.getboolean("metrics", "legacy_names_on"),
+        )
+    except Exception:
+        log.warning(
+            "Failed to initialize Task SDK Stats in the API server; metrics 
emitted through the "
+            "Task SDK Stats singleton will not be recorded.",
+            exc_info=True,
+        )
+
+
 @asynccontextmanager
 async def lifespan(app: FastAPI):
+    _initialize_task_sdk_stats()
     async with AsyncExitStack() as stack:
         for route in app.routes:
             if isinstance(route, Mount) and isinstance(route.app, FastAPI):
diff --git a/airflow-core/tests/unit/api_fastapi/test_app.py 
b/airflow-core/tests/unit/api_fastapi/test_app.py
index 1e8817ef243..1633e11de4e 100644
--- a/airflow-core/tests/unit/api_fastapi/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/test_app.py
@@ -168,3 +168,46 @@ def test_create_auth_manager_thread_safety():
     assert call_count == 1
 
     app_module.purge_cached_app()
+
+
+class TestInitializeTaskSdkStats:
+    """
+    Ensure that stats subsystem is properly initialized in API server.
+    """
+
+    def test_initializes_task_sdk_stats_with_factory(self):
+        """It initializes the Task SDK Stats singleton using the configured 
factory."""
+        sentinel_factory = object()
+        with (
+            mock.patch("airflow._shared.observability.metrics.stats") as 
mock_stats,
+            mock.patch(
+                "airflow.observability.metrics.stats_utils.get_stats_factory",
+                return_value=sentinel_factory,
+            ) as mock_get_factory,
+        ):
+            app_module._initialize_task_sdk_stats()
+
+            mock_get_factory.assert_called_once_with()
+            mock_stats.initialize.assert_called_once()
+            _, kwargs = mock_stats.initialize.call_args
+            assert kwargs["factory"] is sentinel_factory
+            assert isinstance(kwargs["export_legacy_names"], bool)
+
+    def test_stats_failure_does_not_block_startup(self, caplog):
+        """A metrics misconfiguration must not prevent the API server from 
starting."""
+        with (
+            mock.patch("airflow._shared.observability.metrics.stats") as 
mock_stats,
+            
mock.patch("airflow.observability.metrics.stats_utils.get_stats_factory"),
+        ):
+            mock_stats.initialize.side_effect = RuntimeError("boom")
+
+            # Must not raise.
+            app_module._initialize_task_sdk_stats()
+
+        assert any("Failed to initialize Task SDK Stats" in rec.message for 
rec in caplog.records)
+
+    def test_stats_initialized_during_lifespan(self, client):
+        """_initialize_task_sdk_stats must be called as part of the app 
lifespan, not just defined."""
+        with mock.patch.object(app_module, "_initialize_task_sdk_stats") as 
mock_init:
+            with client():
+                mock_init.assert_called_once()

Reply via email to