This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ce72c02c5a0 Add token scope tests for Execution API routes. (#68918)
ce72c02c5a0 is described below
commit ce72c02c5a0f4ba2e83465fff7024a344b06ad53
Author: D. Ferruzzi <[email protected]>
AuthorDate: Tue Jun 23 16:34:44 2026 -0700
Add token scope tests for Execution API routes. (#68918)
---
.../execution_api/test_token_scope_boundaries.py | 100 +++++++++++++++++++++
1 file changed, 100 insertions(+)
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/test_token_scope_boundaries.py
b/airflow-core/tests/unit/api_fastapi/execution_api/test_token_scope_boundaries.py
new file mode 100644
index 00000000000..bbf2be8704f
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/test_token_scope_boundaries.py
@@ -0,0 +1,100 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Regression guard: assert token type boundaries on Execution API routes.
+
+``token:workload`` is a long-lived, minimal-privilege token visible in executor
+queues (Celery messages, K8s pod specs). Its security value depends on being
+accepted by as few routes as possible. See PR #62582 (foundation) and PR
#66989
+(two-token mechanism).
+
+This test checks the real API router and enforces two rules:
+
+1. Routes not listed in NON_DEFAULT_TOKEN_POLICY accept only
``token:execution``.
+2. Routes listed in NON_DEFAULT_TOKEN_POLICY accept exactly the declared types.
+
+Maintenance:
+- New route with default (execution-only) scope: no test change needed.
+- New route with non-default scope: add it to NON_DEFAULT_TOKEN_POLICY.
+- Route removed: remove it from NON_DEFAULT_TOKEN_POLICY (if present).
+- New token type: add it to the relevant entries in NON_DEFAULT_TOKEN_POLICY.
+"""
+
+from __future__ import annotations
+
+import pytest
+from fastapi.routing import APIRoute
+
+from airflow.api_fastapi.execution_api.routes import execution_api_router
+
+# Routes that intentionally deviate from the default (execution-only) policy.
+# Any route NOT listed here must accept only {"execution"}.
+NON_DEFAULT_TOKEN_POLICY: dict[str, set[str]] = {
+ # The /run endpoint exchanges a workload token for a short-lived execution
token.
+ "PATCH /task-instances/{task_instance_id}/run": {"execution", "workload"},
+ # Connection test routes run from a queued worker context (workload-only).
+ "PATCH /connection-tests/{connection_test_id}": {"workload"},
+ "GET /connection-tests/{connection_test_id}/connection": {"workload"},
+}
+
+
+def _all_route_policies() -> dict[str, set[str]]:
+ """Return a map of all API routes and their allowed token types."""
+ policy_map: dict[str, set[str]] = {}
+ for route in execution_api_router.routes:
+ if isinstance(route, APIRoute):
+ allowed_tokens = set(getattr(route, "allowed_token_types",
{"execution"}))
+ if route.methods:
+ for method in route.methods:
+ policy_map[f"{method} {route.path}"] = allowed_tokens
+ return policy_map
+
+
+class TestTokenScopeBoundaries:
+ """Execution API routes must not silently gain or lose token type
access."""
+
+ def test_all_default_routes_are_execution_only(self):
+ actual = _all_route_policies()
+ non_default = {
+ route: types
+ for route, types in actual.items()
+ if route not in NON_DEFAULT_TOKEN_POLICY and types != {"execution"}
+ }
+
+ assert not non_default, (
+ "Routes gained non-default token access without being declared in "
+ "NON_DEFAULT_TOKEN_POLICY:\n "
+ + "\n ".join(f"{route}: got {sorted(tokens)}" for route, tokens
in sorted(non_default.items()))
+ )
+
+ @pytest.mark.parametrize(("route", "expected"),
sorted(NON_DEFAULT_TOKEN_POLICY.items()))
+ def test_non_default_route_still_registered(self, route, expected):
+ actual = _all_route_policies()
+
+ assert route in actual, f"{route}: declared in policy but no longer
registered"
+
+ @pytest.mark.parametrize(("route", "expected"),
sorted(NON_DEFAULT_TOKEN_POLICY.items()))
+ def test_non_default_route_matches_policy(self, route, expected):
+ actual = _all_route_policies()
+ if route not in actual:
+ pytest.skip("Route not registered (caught by
test_non_default_route_still_registered)")
+
+ tokens_gained = actual[route] - expected
+ tokens_lost = expected - actual[route]
+
+ assert not tokens_gained, f"{route}: gained unexpected token types
{sorted(tokens_gained)}"
+ assert not tokens_lost, f"{route}: lost expected token types
{sorted(tokens_lost)}"