This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 38c50f5b54e Explicitly initialize Task SDK Stats in API server
lifespan (hardening; backport fix for 3.2.x) (#68078)
38c50f5b54e is described below
commit 38c50f5b54ed77e125aa95e3005c7bc8e7ad361e
Author: Diogo Silva <[email protected]>
AuthorDate: Fri Jun 12 19:19:16 2026 +0100
Explicitly initialize Task SDK Stats in API server lifespan (hardening;
backport fix for 3.2.x) (#68078)
* Initialize Task SDK Stats in the API server so Edge Worker metrics are
emitted
The API server serves the Edge Worker REST API (/edge_worker/v1/...) whose
heartbeat handler records edge_worker.* metrics through the Task SDK Stats
singleton (resolved by the Edge provider via
airflow.providers.common.compat).
Unlike the scheduler, triggerer, dag-processor, executors and task runner,
the
API server never called Stats.initialize(...). After the auto-initializing
Stats
was removed in #63932, that singleton stays a NoStatsLogger in the API
server
process and every Edge Worker metric is silently dropped.
Initialize the Task SDK Stats singleton from the FastAPI lifespan (runs
once per
worker, post-fork), mirroring the existing init in serde/task_runner. The
call is
guarded so a metrics misconfiguration can never block API server startup.
Closes: #68077
Signed-off-by: Diogo Silva <[email protected]>
* Add newsfragment for #68078
Signed-off-by: Diogo Silva <[email protected]>
* fix: fix ruff errors
* Remove newsfragment — bug fix does not require a user-facing changelog
entry
* Make API server Stats init log and docstring metric-agnostic
* Update airflow-core/src/airflow/api_fastapi/app.py
Co-authored-by: Jens Scheffler <[email protected]>
* Update airflow-core/tests/unit/api_fastapi/test_app.py
Co-authored-by: Jens Scheffler <[email protected]>
---------
Signed-off-by: Diogo Silva <[email protected]>
Co-authored-by: Jens Scheffler <[email protected]>
---
airflow-core/src/airflow/api_fastapi/app.py | 24 ++++++++++++++
airflow-core/tests/unit/api_fastapi/test_app.py | 43 +++++++++++++++++++++++++
2 files changed, 67 insertions(+)
diff --git a/airflow-core/src/airflow/api_fastapi/app.py
b/airflow-core/src/airflow/api_fastapi/app.py
index 8931840c880..45b64dc36f7 100644
--- a/airflow-core/src/airflow/api_fastapi/app.py
+++ b/airflow-core/src/airflow/api_fastapi/app.py
@@ -71,8 +71,32 @@ class _AuthManagerState:
_lock = threading.Lock()
+def _initialize_task_sdk_stats() -> None:
+ """
+ Initialize the Task SDK ``Stats`` singleton in the API server process.
+
+ Initialization is guarded so a metrics misconfiguration can never prevent
the API server
+ from starting.
+ """
+ try:
+ from airflow._shared.observability.metrics import stats
+ from airflow.observability.metrics import stats_utils
+
+ stats.initialize(
+ factory=stats_utils.get_stats_factory(),
+ export_legacy_names=conf.getboolean("metrics", "legacy_names_on"),
+ )
+ except Exception:
+ log.warning(
+ "Failed to initialize Task SDK Stats in the API server; metrics
emitted through the "
+ "Task SDK Stats singleton will not be recorded.",
+ exc_info=True,
+ )
+
+
@asynccontextmanager
async def lifespan(app: FastAPI):
+ _initialize_task_sdk_stats()
async with AsyncExitStack() as stack:
for route in app.routes:
if isinstance(route, Mount) and isinstance(route.app, FastAPI):
diff --git a/airflow-core/tests/unit/api_fastapi/test_app.py
b/airflow-core/tests/unit/api_fastapi/test_app.py
index 1e8817ef243..1633e11de4e 100644
--- a/airflow-core/tests/unit/api_fastapi/test_app.py
+++ b/airflow-core/tests/unit/api_fastapi/test_app.py
@@ -168,3 +168,46 @@ def test_create_auth_manager_thread_safety():
assert call_count == 1
app_module.purge_cached_app()
+
+
+class TestInitializeTaskSdkStats:
+ """
+ Ensure that stats subsystem is properly initialized in API server.
+ """
+
+ def test_initializes_task_sdk_stats_with_factory(self):
+ """It initializes the Task SDK Stats singleton using the configured
factory."""
+ sentinel_factory = object()
+ with (
+ mock.patch("airflow._shared.observability.metrics.stats") as
mock_stats,
+ mock.patch(
+ "airflow.observability.metrics.stats_utils.get_stats_factory",
+ return_value=sentinel_factory,
+ ) as mock_get_factory,
+ ):
+ app_module._initialize_task_sdk_stats()
+
+ mock_get_factory.assert_called_once_with()
+ mock_stats.initialize.assert_called_once()
+ _, kwargs = mock_stats.initialize.call_args
+ assert kwargs["factory"] is sentinel_factory
+ assert isinstance(kwargs["export_legacy_names"], bool)
+
+ def test_stats_failure_does_not_block_startup(self, caplog):
+ """A metrics misconfiguration must not prevent the API server from
starting."""
+ with (
+ mock.patch("airflow._shared.observability.metrics.stats") as
mock_stats,
+
mock.patch("airflow.observability.metrics.stats_utils.get_stats_factory"),
+ ):
+ mock_stats.initialize.side_effect = RuntimeError("boom")
+
+ # Must not raise.
+ app_module._initialize_task_sdk_stats()
+
+ assert any("Failed to initialize Task SDK Stats" in rec.message for
rec in caplog.records)
+
+ def test_stats_initialized_during_lifespan(self, client):
+ """_initialize_task_sdk_stats must be called as part of the app
lifespan, not just defined."""
+ with mock.patch.object(app_module, "_initialize_task_sdk_stats") as
mock_init:
+ with client():
+ mock_init.assert_called_once()