kaxil commented on code in PR #60108:
URL: https://github.com/apache/airflow/pull/60108#discussion_r2991910313


##########
airflow-core/tests/unit/api_fastapi/execution_api/conftest.py:
##########
@@ -53,6 +57,12 @@ async def mock_jwt_bearer(request: Request):
     exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
 
     with TestClient(app, headers={"Authorization": "Bearer fake"}) as client:
+        mock_generator = MagicMock(spec=JWTGenerator)
+        mock_generator.generate.return_value = "mock-execution-token"
+        lifespan.registry.register_value(JWTGenerator, mock_generator)
+
         yield client
 
+        lifespan.registry.close()
+
     exec_app.dependency_overrides.pop(_jwt_bearer, None)

Review Comment:
   `lifespan.registry.close()` is new here (no other test file does this), and 
the registry is shared across all tests via `cached_app`. Closing it could 
break subsequent tests that try to look up services from the same registry. The 
existing pattern in other test files (e.g., `test_task_instances.py`, 
`test_router.py`) registers values on `lifespan.registry` without closing it 
afterward. I'd drop this `close()` call to match what the rest of the test 
suite does.



##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -142,6 +144,11 @@ async def dispatch(self, request: Request, call_next):
                     validator: JWTValidator = await services.aget(JWTValidator)
                     claims = await validator.avalidated_claims(token, {})
 
+                    # Workload tokens are long-lived and meant to survive queue
+                    # wait times so avoid refreshing them.
+                    if claims.get("scope") == "workload":
+                        return response
+

Review Comment:
   The early return for workload tokens skips the refresh logic (correct), but 
it also skips the `except` block below. If `avalidated_claims` raises for a 
workload token, execution falls into the outer `except` and the response still 
gets returned (with a warning log). Might be worth a comment clarifying that 
workload token validation errors are handled by the outer catch.



##########
airflow-core/src/airflow/api_fastapi/execution_api/app.py:
##########
@@ -332,10 +343,29 @@ async def always_allow(request: Request):
                 )
                 return TIToken(id=ti_id, claims={"scope": "execution"})
 
+            # Override _container (the svcs service locator behind 
DepContainer).
+            # The default _container reads request.app.state.svcs_registry, but
+            # Cadwyn's versioned sub-apps don't inherit the main app's state,
+            # so lookups raise ServiceNotFoundError. This registry provides
+            # services needed by routes called during dag.test().
+            #

Review Comment:
   The stub `JWTGenerator` uses `secrets.token_urlsafe(32)` as the secret key, 
so a new key is generated every time `InProcessExecutionAPI.app` is accessed. 
Since `app` is a `cached_property`, the key is stable for the lifetime of the 
object. But the token generated here by `ti_run` won't be validated by anything 
(since `_jwt_bearer` is also overridden with `always_allow`), so this stub only 
exists to satisfy the `services.get(JWTGenerator)` call. A brief comment noting 
that these tokens are never validated in `dag.test()` mode would help future 
readers.



##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py:
##########
@@ -286,14 +291,18 @@ def ti_run(
         if ti.next_method:
             context.next_method = ti.next_method
             context.next_kwargs = ti.next_kwargs
-
-        return context
     except SQLAlchemyError:
         log.exception("Error marking Task Instance state as running")
         raise HTTPException(
             status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 
detail="Database error occurred"
         )
 
+    generator: JWTGenerator = services.get(JWTGenerator)
+    execution_token = generator.generate(extras={"sub": str(task_instance_id)})

Review Comment:
   Token generation happens outside the `try...except SQLAlchemyError` block. 
If `services.get(JWTGenerator)` or `generator.generate()` raises (missing 
service, crypto error, etc.), the client gets a raw 500 with no useful detail. 
Worth wrapping this in its own try/except, or at minimum a log line, so 
operators can tell the difference between "database error" and "token 
generation failed".



##########
airflow-core/src/airflow/api_fastapi/auth/tokens.py:
##########
@@ -418,6 +418,10 @@ class JWTGenerator:
 
     kid: str = attrs.field(default=attrs.Factory(_generate_kid, 
takes_self=True))
     valid_for: float
+    workload_valid_for: float = attrs.field(

Review Comment:
   The `workload_valid_for` default reads from config via `_conf_factory`, and 
`_jwt_generator()` in `app.py` also reads the same config key and passes it 
explicitly. The explicit kwarg takes precedence, so the default factory never 
runs in production. Having two code paths that reference the same config key is 
easy to get out of sync -- consider dropping the attrs default (make it 
required like `valid_for`) and always passing it explicitly, or drop the 
explicit kwarg in `_jwt_generator()` and let the default handle it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to