pierrejeambrun commented on code in PR #64523: URL: https://github.com/apache/airflow/pull/64523#discussion_r3349356617
########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) +_ROUTE_PATHS_BY_ROUTER_ID: dict[int, dict[object, str]] = {} + + +def _get_api_surface(path: str) -> str | None: + for prefix, surface in _API_PATH_PREFIX_TO_SURFACE: + if path == prefix or path.startswith(f"{prefix}/"): + return surface + return None + + +def _get_status_family(status_code: int) -> str: + return f"{status_code // 100}xx" + + +def _get_route_tag(scope: Scope) -> str: + route = scope.get("route") + route_path = getattr(route, "path", None) + if isinstance(route_path, str) and route_path: + return route_path + + router = scope.get("router") + endpoint = scope.get("endpoint") + if router is not None and endpoint is not None: + route_paths = _ROUTE_PATHS_BY_ROUTER_ID.get(id(router)) + if route_paths is None: + route_paths = { + candidate_endpoint: candidate_route_path + for candidate_route in getattr(router, "routes", ()) + for candidate_endpoint, candidate_route_path in [ + ( + getattr(candidate_route, "endpoint", None), + getattr(candidate_route, "path", None), + ) + ] + if candidate_endpoint is not None + and isinstance(candidate_route_path, str) + and candidate_route_path + } + _ROUTE_PATHS_BY_ROUTER_ID[id(router)] = route_paths + + endpoint_route_path = route_paths.get(endpoint) + if isinstance(endpoint_route_path, str) and endpoint_route_path: + return endpoint_route_path + + return "unmatched" + + +def _emit_api_metrics( + *, + scope: Scope, + path: str, + method: str, + status_code: int, + duration_us: int, +) -> None: + api_surface = _get_api_surface(path) + if api_surface is None: + return + + # Keep tags bounded so API metrics remain usable across supported backends. + base_tags = { + "api_surface": api_surface, + "method": method or "UNKNOWN", + "route": _get_route_tag(scope), + } + status_family = _get_status_family(status_code) + request_tags = { + **base_tags, + "status_family": status_family, + } + duration_ms = duration_us / 1000.0 + + Stats.incr("api.requests", tags=request_tags) + Stats.timing("api.request.duration", duration_ms, tags=request_tags) + if status_code >= 500: + Stats.incr("api.request.errors", tags=base_tags) + + +class HttpMetricsMiddleware: Review Comment: I don't think so. It was done like this in the original draft, and asked specifically to split them because they serve different purpose. But we can extract a base class to factorize some code if needed. ########## airflow-core/src/airflow/api_fastapi/common/http_metrics.py: ########## @@ -0,0 +1,168 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""HTTP API metrics middleware.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING + +import structlog + +from airflow._shared.observability.metrics.stats import Stats +from airflow.api_fastapi.common.http_paths import HEALTH_PATHS + +if TYPE_CHECKING: + from starlette.types import ASGIApp, Message, Receive, Scope, Send + +logger = structlog.get_logger(logger_name="http.metrics") + +_API_PATH_PREFIX_TO_SURFACE = ( + ("/api/v2", "public"), + ("/ui", "ui"), +) +_ROUTE_PATHS_BY_ROUTER_ID: dict[int, dict[object, str]] = {} + + +def _get_api_surface(path: str) -> str | None: + for prefix, surface in _API_PATH_PREFIX_TO_SURFACE: + if path == prefix or path.startswith(f"{prefix}/"): + return surface + return None + + +def _get_status_family(status_code: int) -> str: + return f"{status_code // 100}xx" + + +def _get_route_tag(scope: Scope) -> str: + route = scope.get("route") + route_path = getattr(route, "path", None) + if isinstance(route_path, str) and route_path: + return route_path + + router = scope.get("router") + endpoint = scope.get("endpoint") + if router is not None and endpoint is not None: + route_paths = _ROUTE_PATHS_BY_ROUTER_ID.get(id(router)) + if route_paths is None: + route_paths = { + candidate_endpoint: candidate_route_path + for candidate_route in getattr(router, "routes", ()) + for candidate_endpoint, candidate_route_path in [ + ( + getattr(candidate_route, "endpoint", None), + getattr(candidate_route, "path", None), + ) + ] + if candidate_endpoint is not None + and isinstance(candidate_route_path, str) + and candidate_route_path + } + _ROUTE_PATHS_BY_ROUTER_ID[id(router)] = route_paths + + endpoint_route_path = route_paths.get(endpoint) + if isinstance(endpoint_route_path, str) and endpoint_route_path: + return endpoint_route_path + + return "unmatched" + + +def _emit_api_metrics( + *, + scope: Scope, + path: str, + method: str, + status_code: int, + duration_us: int, +) -> None: + api_surface = _get_api_surface(path) + if api_surface is None: + return + + # Keep tags bounded so API metrics remain usable across supported backends. + base_tags = { + "api_surface": api_surface, + "method": method or "UNKNOWN", + "route": _get_route_tag(scope), + } + status_family = _get_status_family(status_code) + request_tags = { + **base_tags, + "status_family": status_family, + } + duration_ms = duration_us / 1000.0 + + Stats.incr("api.requests", tags=request_tags) + Stats.timing("api.request.duration", duration_ms, tags=request_tags) + if status_code >= 500: + Stats.incr("api.request.errors", tags=base_tags) + + +class HttpMetricsMiddleware: Review Comment: I don't think so. It was done like this in the original draft, and asked specifically to split them for separation of concerns and clarity. But we can extract a base class to factorize some code if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
