kaxil commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r3292563283
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py:
##########
@@ -259,6 +283,94 @@ def test_connection(test_body: ConnectionBody) ->
ConnectionTestResponse:
os.environ.pop(conn_env_var, None)
+@connections_router.post(
+ "/enqueue-test",
+ status_code=status.HTTP_202_ACCEPTED,
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_409_CONFLICT,
+ status.HTTP_422_UNPROCESSABLE_ENTITY,
+ ]
+ ),
+ dependencies=[Depends(requires_access_connection(method="POST")),
Depends(action_logging())],
+)
+def enqueue_connection_test(
Review Comment:
The POST endpoint authorizes against the `team_name` in `test_body`, but the
existing `Connection` row identified by `connection_id` may belong to a
different team. Combined with `commit_on_success=True`, a user with POST rights
on team A can submit a test for a team B `connection_id` with new credentials,
and on a successful test the row gets overwritten with team A's payload
(cross-team privilege escalation).
Resolve the team from the row before authz:
```python
actual_team = Connection.get_team_name(test_body.connection_id, session)
requires_access_connection("POST", team_name=actual_team)
```
and reject if `test_body.team_name` is set and doesn't match.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py:
##########
@@ -259,6 +283,94 @@ def test_connection(test_body: ConnectionBody) ->
ConnectionTestResponse:
os.environ.pop(conn_env_var, None)
+@connections_router.post(
+ "/enqueue-test",
+ status_code=status.HTTP_202_ACCEPTED,
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_409_CONFLICT,
+ status.HTTP_422_UNPROCESSABLE_ENTITY,
+ ]
+ ),
+ dependencies=[Depends(requires_access_connection(method="POST")),
Depends(action_logging())],
+)
+def enqueue_connection_test(
+ test_body: ConnectionTestRequestBody,
+ session: SessionDep,
+) -> ConnectionTestQueuedResponse:
+ """Enqueue a connection test for deferred execution on a worker; returns a
polling token."""
+ _ensure_test_connection_enabled()
+ _ensure_executor_is_configured(test_body.executor)
+
+ connection_test = ConnectionTestRequest(
+ connection_id=test_body.connection_id,
+ conn_type=test_body.conn_type,
+ host=test_body.host,
+ login=test_body.login,
+ password=test_body.password,
+ schema=test_body.schema_,
+ port=test_body.port,
+ extra=test_body.extra,
+ commit_on_success=test_body.commit_on_success,
+ executor=test_body.executor,
+ queue=test_body.queue,
+ )
+ session.add(connection_test)
+ try:
+ session.flush()
+ except IntegrityError:
+ raise HTTPException(
+ status.HTTP_409_CONFLICT,
+ f"An active connection test already exists for connection_id
`{test_body.connection_id}`.",
+ )
+
+ return ConnectionTestQueuedResponse(
+ token=connection_test.token,
+ connection_id=connection_test.connection_id,
+ state=connection_test.state,
+ )
+
+
+@connections_router.get(
+ "/enqueue-test/{connection_test_token}",
Review Comment:
Putting the worker callback token in the URL path means it ends up in API
server access logs, reverse-proxy logs, browser history, and any intermediate
observability that captures request URIs. A token sitting in a URL is treated
as low-sensitivity by every layer between the worker and the API, but it grants
the same authority as a JWT in an `Authorization` header.
Two options:
1. Move the token to an `Authorization: Bearer ...` header (or a POST body
field) and resolve the request by `request_id` alone.
2. Keep the URL stable but store only a SHA-256 hash of the token, then
accept the plaintext via header and compare hashes server-side.
Option 2 is friendlier if the worker has to retry without re-issuing tokens,
but option 1 is simpler.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3245,6 +3261,94 @@ def _cleanup_orphaned_asset_state(*, session: Session)
-> None:
)
session.execute(delete(AssetStateModel).where(AssetStateModel.asset_id.not_in(active_asset_ids)))
+ def _enqueue_connection_tests(self, *, session: Session) -> None:
+ """
+ Enqueue pending connection tests to executors that support them.
+
+ ``max_concurrency`` is per-scheduler, not global: with N HA schedulers
+ the worst-case per-tick dispatch is ``N * max_concurrency``. Connection
+ tests are user-initiated and rare, so the overshoot self-corrects via
+ the reaper. For a true global cap, wrap the budget+claim below in a
+ sentinel-row ``SELECT ... FOR UPDATE``.
+ """
+ max_concurrency = conf.getint("connection_test", "max_concurrency",
fallback=4)
+ timeout = conf.getint("connection_test", "timeout", fallback=60)
+
+ active_count = session.scalar(
+ select(func.count(ConnectionTestRequest.id)).where(
+ ConnectionTestRequest.state.in_(DISPATCHED_STATES)
+ )
+ )
+ budget = max_concurrency - (active_count or 0)
+ if budget <= 0:
+ return
+
+ pending_stmt = (
+ select(ConnectionTestRequest)
+ .where(ConnectionTestRequest.state == ConnectionTestState.PENDING)
+ .order_by(ConnectionTestRequest.created_at)
+ .limit(budget)
+ )
+ pending_stmt = with_row_locks(pending_stmt, session,
of=ConnectionTestRequest, skip_locked=True)
+ pending_tests = session.scalars(pending_stmt).all()
+
+ if not pending_tests:
+ return
+
+ for ct in pending_tests:
+ executor = self._try_to_load_executor(ct, session)
Review Comment:
`_get_workload_team_name` calls `ct.get_dag_id()`, which returns `None` for
`ConnectionTestRequest` because there's no DAG behind a connection test. The
`team_name` resolution then collapses to `None`, and every connection-test
workload routes through the global executor regardless of which team the
connection belongs to. The team-scoped executor pool the rest of the PR sets up
is bypassed for this workload type.
Resolve the team directly from the row:
```python
team_name = Connection.get_team_name(ct.connection_id, session)
executor = self._try_to_load_executor(executor_name, team_name=team_name)
```
A regression test that spins up two team-scoped executors and asserts a
`ConnectionTest` workload for team B lands on team B's executor, not the global
one, would lock this in.
##########
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_05_02.py:
##########
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
This PR adds `v2026_05_02.py`, which sits between the already-released
`v2026_04_06` and the unreleased `v2026_06_16` / `v2026_06_30`. The
execution-API versioning convention is that new `VersionChange` entries go into
the most recent **unreleased** version file, not a new file dated between
releases. Inserting a backdated version breaks the linear migration chain
cadwyn replays.
Delete `v2026_05_02.py` and add `AddConnectionTestEndpoint` to
`v2026_06_30.py` (or whichever is the latest unreleased file at merge time).
##########
airflow-core/src/airflow/models/crypto.py:
##########
@@ -93,6 +96,65 @@ def rotate(self, msg: bytes | str) -> bytes:
return self._fernet.rotate(msg)
+class FernetFieldsMixin:
+ """Mixin providing Fernet-encrypted ``password`` and ``extra`` fields."""
+
+ _password: Mapped[str | None] = mapped_column("password", Text(),
nullable=True)
+ _extra: Mapped[str | None] = mapped_column("extra", Text(), nullable=True)
+ is_encrypted: Mapped[bool] = mapped_column(Boolean, unique=False,
default=False, nullable=False)
+ is_extra_encrypted: Mapped[bool] = mapped_column(Boolean, unique=False,
default=False, nullable=False)
+
+ def get_password(self) -> str | None:
+ """Decrypt and return password."""
+ if self._password and self.is_encrypted:
+ fernet = get_fernet()
+ if not fernet.is_encrypted:
+ raise ValueError("Can't decrypt encrypted password, FERNET_KEY
configuration is missing")
+ return fernet.decrypt(bytes(self._password, "utf-8")).decode()
+ return self._password
+
+ def set_password(self, value: str | None):
+ """Encrypt and store password."""
+ if value:
+ fernet = get_fernet()
+ self._password = fernet.encrypt(bytes(value, "utf-8")).decode()
+ self.is_encrypted = fernet.is_encrypted
+ else:
Review Comment:
The pre-PR `Connection.set_password` was a no-op when `value` was falsy. The
mixin version here adds an `else` branch that resets `self._password = None`
and `self.is_encrypted = False`. Any caller flowing `password=None` (typical
for a PATCH with an update mask that omits the password, or a sparse update
path that explicitly sends `null`) now wipes the existing encrypted password
instead of leaving it alone.
Either restore the no-op semantics (return early when `not value`), or treat
this as a behavior change with a newsfragment and confirm every PATCH/PUT path
in the connections API filters `None` before calling the setter.
A unit test that round-trips `conn.set_password("secret");
conn.set_password(None); assert conn.password == "secret"` would lock this in.
##########
task-sdk/src/airflow/sdk/execution_time/connection_test_supervisor.py:
##########
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Supervised execution of TestConnection workloads."""
+
+from __future__ import annotations
+
+import uuid
+
+import structlog
+
+from airflow.sdk.api.client import Client
+from airflow.sdk.api.datamodels._generated import ConnectionTestState
+from airflow.sdk.definitions.connection import Connection as SDKConnection
+from airflow.sdk.exceptions import AirflowTaskTimeout
+from airflow.sdk.execution_time.timeout import TimeoutPosix
+
+__all__ = ["supervise_connection_test"]
+
+log = structlog.get_logger(logger_name="connection_test_supervisor")
+
+
+def supervise_connection_test(
+ *,
+ connection_test_id: uuid.UUID,
+ connection_id: str,
+ timeout: int,
+ token: str,
+ server: str,
+) -> int:
+ """Execute a connection test on the worker and report the result via the
Execution API."""
+ client = Client(base_url=server, token=token)
+
+ try:
+ r = client.connection_tests.get_connection(connection_test_id)
+
+ conn = SDKConnection(
+ conn_id=r.conn_id,
+ conn_type=r.conn_type,
+ host=r.host,
+ login=r.login,
+ password=r.password,
+ schema=r.schema_,
+ port=r.port,
+ extra=r.extra,
+ )
+ with TimeoutPosix(
+ seconds=timeout,
+ error_message=f"Connection test timed out after {timeout}s",
+ ):
+ success, message = conn.test_connection()
Review Comment:
The `_preset_connections` ContextVar short-circuits
`_get_connection(conn_id)` from the preset, which works as long as
`hook.test_connection()` resolves credentials through
`BaseHook.get_connection`. But several real-world hooks resolve their own
`conn_id` differently:
- SSH/AWS hooks often read `AIRFLOW_CONN_<ID>` directly in `__init__` and
store the resolved fields on the hook instance.
- Anything that goes through `BaseHook.get_connection_from_secrets` hits the
configured secrets backend (HashiCorp Vault, AWS Secrets Manager, etc.) before
falling back to the metadata DB.
In both cases the preset is silently ignored and `test_connection()` ends up
testing whatever the real env/backend says, which can be a different credential
(or none at all). The previous in-process implementation set
`os.environ[conn_env_var] = conn.get_uri()` exactly to defend against this.
Can you verify with at least one hook in each class (SSH or AWS for the
env-var case, any provider that resolves via secrets backend for the second),
and if they pick the wrong creds, add an env-var override in the worker
supervisor before invoking the hook?
##########
task-sdk/src/airflow/sdk/execution_time/connection_test_supervisor.py:
##########
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Supervised execution of TestConnection workloads."""
+
+from __future__ import annotations
+
+import uuid
+
+import structlog
+
+from airflow.sdk.api.client import Client
+from airflow.sdk.api.datamodels._generated import ConnectionTestState
+from airflow.sdk.definitions.connection import Connection as SDKConnection
+from airflow.sdk.exceptions import AirflowTaskTimeout
+from airflow.sdk.execution_time.timeout import TimeoutPosix
+
+__all__ = ["supervise_connection_test"]
+
+log = structlog.get_logger(logger_name="connection_test_supervisor")
+
+
+def supervise_connection_test(
+ *,
+ connection_test_id: uuid.UUID,
+ connection_id: str,
+ timeout: int,
+ token: str,
+ server: str,
+) -> int:
+ """Execute a connection test on the worker and report the result via the
Execution API."""
+ client = Client(base_url=server, token=token)
+
+ try:
+ r = client.connection_tests.get_connection(connection_test_id)
+
+ conn = SDKConnection(
+ conn_id=r.conn_id,
+ conn_type=r.conn_type,
+ host=r.host,
+ login=r.login,
+ password=r.password,
+ schema=r.schema_,
+ port=r.port,
+ extra=r.extra,
+ )
+ with TimeoutPosix(
Review Comment:
Following up on the prior SIGALRM/Windows concern. Renaming to
`TimeoutPosix` makes the platform constraint explicit, thanks. But the import
of `signal.SIGALRM` inside `timeout.py` is still unconditional, so a Windows
worker process raises `AttributeError` at import time before it can even decide
whether to call the timeout. A `hasattr(signal, "SIGALRM")` gate at the import
site, or a `threading.Timer` fallback, closes the loop.
If Windows isn't a supported worker platform for this feature, please
document that on `supports_connection_test` (and ideally fail fast at
supervisor start rather than at the first connection test) so users understand
the boundary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]