jason810496 commented on code in PR #64523: URL: https://github.com/apache/airflow/pull/64523#discussion_r3341311500
########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) +_ROUTE_PATHS_BY_ROUTER_ID: dict[int, dict[object, str]] = {} + + +def _get_api_surface(path: str) -> str | None: + for prefix, surface in _API_PATH_PREFIX_TO_SURFACE: + if path == prefix or path.startswith(f"{prefix}/"): Review Comment: IIUC, having `path.startswith(prefix)` would be enough. ```suggestion path.startswith(prefix) ``` ########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) +_ROUTE_PATHS_BY_ROUTER_ID: dict[int, dict[object, str]] = {} + + +def _get_api_surface(path: str) -> str | None: + for prefix, surface in _API_PATH_PREFIX_TO_SURFACE: + if path == prefix or path.startswith(f"{prefix}/"): + return surface + return None + + +def _get_status_family(status_code: int) -> str: + return f"{status_code // 100}xx" + + +def _get_route_tag(scope: Scope) -> str: + route = scope.get("route") + route_path = getattr(route, "path", None) + if isinstance(route_path, str) and route_path: + return route_path + + router = scope.get("router") + endpoint = scope.get("endpoint") + if router is not None and endpoint is not None: + route_paths = _ROUTE_PATHS_BY_ROUTER_ID.get(id(router)) + if route_paths is None: + route_paths = { + candidate_endpoint: candidate_route_path + for candidate_route in getattr(router, "routes", ()) + for candidate_endpoint, candidate_route_path in [ + ( + getattr(candidate_route, "endpoint", None), + getattr(candidate_route, "path", None), + ) + ] + if candidate_endpoint is not None + and isinstance(candidate_route_path, str) + and candidate_route_path + } + _ROUTE_PATHS_BY_ROUTER_ID[id(router)] = route_paths + + endpoint_route_path = route_paths.get(endpoint) + if isinstance(endpoint_route_path, str) and endpoint_route_path: + return endpoint_route_path + + return "unmatched" + + +def _emit_api_metrics( + *, + scope: Scope, + path: str, + method: str, + status_code: int, + duration_us: int, +) -> None: + api_surface = _get_api_surface(path) + if api_surface is None: + return + + # Keep tags bounded so API metrics remain usable across supported backends. + base_tags = { + "api_surface": api_surface, + "method": method or "UNKNOWN", + "route": _get_route_tag(scope), + } + status_family = _get_status_family(status_code) + request_tags = { + **base_tags, + "status_family": status_family, + } + duration_ms = duration_us / 1000.0 + + Stats.incr("api.requests", tags=request_tags) + Stats.timing("api.request.duration", duration_ms, tags=request_tags) + if status_code >= 500: + Stats.incr("api.request.errors", tags=base_tags) + + +class HttpMetricsMiddleware: + """ + Emit REST API metrics for completed HTTP requests. + + Health-check paths are excluded to avoid metric noise. + """ + + def __init__(self, app: ASGIApp) -> None: + self.app = app + + async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: + if scope["type"] != "http": + await self.app(scope, receive, send) + return + + start = time.monotonic_ns() + response: Message | None = None + + async def capture_send(message: Message) -> None: + nonlocal response + if message["type"] == "http.response.start": + response = message + await send(message) + + try: + await self.app(scope, receive, capture_send) + except Exception: + if response is None: + response = {"status": 500} + raise + finally: + path = scope["path"] + if path not in HEALTH_PATHS: + duration_us = (time.monotonic_ns() - start) // 1000 + status = response["status"] if response is not None else 0 + method = scope.get("method", "") Review Comment: ```suggestion method = scope.get("method", "UNKNOW") ``` ########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) +_ROUTE_PATHS_BY_ROUTER_ID: dict[int, dict[object, str]] = {} + + +def _get_api_surface(path: str) -> str | None: + for prefix, surface in _API_PATH_PREFIX_TO_SURFACE: + if path == prefix or path.startswith(f"{prefix}/"): + return surface + return None + + +def _get_status_family(status_code: int) -> str: + return f"{status_code // 100}xx" + + +def _get_route_tag(scope: Scope) -> str: + route = scope.get("route") + route_path = getattr(route, "path", None) + if isinstance(route_path, str) and route_path: + return route_path + + router = scope.get("router") + endpoint = scope.get("endpoint") + if router is not None and endpoint is not None: + route_paths = _ROUTE_PATHS_BY_ROUTER_ID.get(id(router)) + if route_paths is None: + route_paths = { + candidate_endpoint: candidate_route_path + for candidate_route in getattr(router, "routes", ()) + for candidate_endpoint, candidate_route_path in [ + ( + getattr(candidate_route, "endpoint", None), + getattr(candidate_route, "path", None), + ) + ] + if candidate_endpoint is not None + and isinstance(candidate_route_path, str) + and candidate_route_path + } + _ROUTE_PATHS_BY_ROUTER_ID[id(router)] = route_paths + + endpoint_route_path = route_paths.get(endpoint) + if isinstance(endpoint_route_path, str) and endpoint_route_path: + return endpoint_route_path + + return "unmatched" + + +def _emit_api_metrics( + *, + scope: Scope, + path: str, + method: str, + status_code: int, + duration_us: int, +) -> None: + api_surface = _get_api_surface(path) + if api_surface is None: + return Review Comment: Perhaps we could keep `api_surface` nullable instead of early return. The routers registered by plugin could also get benefit from this feature. ########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) +_ROUTE_PATHS_BY_ROUTER_ID: dict[int, dict[object, str]] = {} + + +def _get_api_surface(path: str) -> str | None: + for prefix, surface in _API_PATH_PREFIX_TO_SURFACE: + if path == prefix or path.startswith(f"{prefix}/"): + return surface + return None + + +def _get_status_family(status_code: int) -> str: + return f"{status_code // 100}xx" + + +def _get_route_tag(scope: Scope) -> str: + route = scope.get("route") + route_path = getattr(route, "path", None) + if isinstance(route_path, str) and route_path: + return route_path + + router = scope.get("router") + endpoint = scope.get("endpoint") + if router is not None and endpoint is not None: + route_paths = _ROUTE_PATHS_BY_ROUTER_ID.get(id(router)) + if route_paths is None: + route_paths = { + candidate_endpoint: candidate_route_path + for candidate_route in getattr(router, "routes", ()) + for candidate_endpoint, candidate_route_path in [ + ( + getattr(candidate_route, "endpoint", None), + getattr(candidate_route, "path", None), + ) + ] + if candidate_endpoint is not None + and isinstance(candidate_route_path, str) + and candidate_route_path + } + _ROUTE_PATHS_BY_ROUTER_ID[id(router)] = route_paths + + endpoint_route_path = route_paths.get(endpoint) + if isinstance(endpoint_route_path, str) and endpoint_route_path: + return endpoint_route_path + + return "unmatched" + + +def _emit_api_metrics( + *, + scope: Scope, + path: str, + method: str, + status_code: int, + duration_us: int, +) -> None: + api_surface = _get_api_surface(path) + if api_surface is None: + return + + # Keep tags bounded so API metrics remain usable across supported backends. + base_tags = { + "api_surface": api_surface, + "method": method or "UNKNOWN", Review Comment: ```suggestion "method": method, ``` ########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) +_ROUTE_PATHS_BY_ROUTER_ID: dict[int, dict[object, str]] = {} Review Comment: I just learned from claude that this is a ideal case for using `WeakKeyDictionary`. More context: ```python from weakref import WeakKeyDictionary _ROUTE_PATHS_BY_ROUTER: WeakKeyDictionary = WeakKeyDictionary() def _route_paths_for(router) -> dict[object, str]: cached = _ROUTE_PATHS_BY_ROUTER.get(router) if cached is None: cached = { ep: route.path for route in getattr(router, "routes", ()) if (ep := getattr(route, "endpoint", None)) is not None and isinstance(getattr(route, "path", None), str) and route.path } _ROUTE_PATHS_BY_ROUTER[router] = cached return cached ``` Why this is better than id(router): - id() is recycled after an object is GC'd, so a stale entry could be returned for a different router that happens to land at the same address. A weak key holds the actual object identity, so that confusion can't happen. - A plain dict[int, ...] keyed by id() never drops entries — it grows one per app instance forever (negligible in a long-lived server, but it accumulates across the many apps created in the test suite). WeakKeyDictionary drops the entry automatically when the router dies. ########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) Review Comment: Or perhaps having a config as `dict` that default as current value. So that user could define the mapping themself and enable more routes. For example, showing the execution API on metrics or naming the plugin routes without code chagnes. ########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) +_ROUTE_PATHS_BY_ROUTER_ID: dict[int, dict[object, str]] = {} + + +def _get_api_surface(path: str) -> str | None: + for prefix, surface in _API_PATH_PREFIX_TO_SURFACE: + if path == prefix or path.startswith(f"{prefix}/"): + return surface + return None + + +def _get_status_family(status_code: int) -> str: + return f"{status_code // 100}xx" + + +def _get_route_tag(scope: Scope) -> str: + route = scope.get("route") + route_path = getattr(route, "path", None) + if isinstance(route_path, str) and route_path: + return route_path + + router = scope.get("router") + endpoint = scope.get("endpoint") + if router is not None and endpoint is not None: + route_paths = _ROUTE_PATHS_BY_ROUTER_ID.get(id(router)) + if route_paths is None: + route_paths = { + candidate_endpoint: candidate_route_path + for candidate_route in getattr(router, "routes", ()) + for candidate_endpoint, candidate_route_path in [ + ( + getattr(candidate_route, "endpoint", None), + getattr(candidate_route, "path", None), + ) + ] + if candidate_endpoint is not None + and isinstance(candidate_route_path, str) + and candidate_route_path + } + _ROUTE_PATHS_BY_ROUTER_ID[id(router)] = route_paths + + endpoint_route_path = route_paths.get(endpoint) + if isinstance(endpoint_route_path, str) and endpoint_route_path: + return endpoint_route_path + + return "unmatched" + + +def _emit_api_metrics( + *, + scope: Scope, + path: str, + method: str, + status_code: int, + duration_us: int, +) -> None: + api_surface = _get_api_surface(path) + if api_surface is None: + return + + # Keep tags bounded so API metrics remain usable across supported backends. + base_tags = { + "api_surface": api_surface, + "method": method or "UNKNOWN", + "route": _get_route_tag(scope), + } + status_family = _get_status_family(status_code) + request_tags = { + **base_tags, + "status_family": status_family, + } + duration_ms = duration_us / 1000.0 + + Stats.incr("api.requests", tags=request_tags) + Stats.timing("api.request.duration", duration_ms, tags=request_tags) + if status_code >= 500: + Stats.incr("api.request.errors", tags=base_tags) + + +class HttpMetricsMiddleware: Review Comment: Additionally, it would be nice to consolidate the `HttpMetricsMiddleware` into the existing `HttpAccessLogMiddleware`. Since they're almost identical and share the same `capture_send` utility. For example: ```python def __init__(self, app, ...): ... self._emit_metrics = is_metrics_enabled() # inside the existing finally, after computing path/status/method/duration_us: with contextlib.suppress(Exception): logger.info("request finished", ...) if self._emit_metrics: try: _emit_api_metrics(scope=scope, path=path, method=method, status_code=status, duration_us=duration_us) except Exception: logger.exception("failed to emit API metrics", ...) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
