kaxil commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r3292564464


##########
airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_05_02.py:
##########
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   **Cadwyn version is backdated; new changes should land in the latest 
unreleased file.**
   
   This PR adds `v2026_05_02.py`, which sits between the already-released 
`v2026_04_06` and the unreleased `v2026_06_16`/`v2026_06_30`. The execution-API 
versioning convention is that new `VersionChange` entries go into the most 
recent **unreleased** version file, not a new file dated between releases. 
Inserting a backdated version breaks the linear migration chain cadwyn replays.
   
   Delete `v2026_05_02.py` and add `AddConnectionTestEndpoint` to 
`v2026_06_30.py` (or whichever is currently the latest unreleased file at merge 
time).



##########
airflow-core/src/airflow/models/crypto.py:
##########
@@ -93,6 +96,65 @@ def rotate(self, msg: bytes | str) -> bytes:
         return self._fernet.rotate(msg)
 
 
+class FernetFieldsMixin:
+    """Mixin providing Fernet-encrypted ``password`` and ``extra`` fields."""
+
+    _password: Mapped[str | None] = mapped_column("password", Text(), 
nullable=True)
+    _extra: Mapped[str | None] = mapped_column("extra", Text(), nullable=True)
+    is_encrypted: Mapped[bool] = mapped_column(Boolean, unique=False, 
default=False, nullable=False)
+    is_extra_encrypted: Mapped[bool] = mapped_column(Boolean, unique=False, 
default=False, nullable=False)
+
+    def get_password(self) -> str | None:
+        """Decrypt and return password."""
+        if self._password and self.is_encrypted:
+            fernet = get_fernet()
+            if not fernet.is_encrypted:
+                raise ValueError("Can't decrypt encrypted password, FERNET_KEY 
configuration is missing")
+            return fernet.decrypt(bytes(self._password, "utf-8")).decode()
+        return self._password
+
+    def set_password(self, value: str | None):
+        """Encrypt and store password."""
+        if value:
+            fernet = get_fernet()
+            self._password = fernet.encrypt(bytes(value, "utf-8")).decode()
+            self.is_encrypted = fernet.is_encrypted
+        else:

Review Comment:
   **Extracting `set_password` to the mixin silently changes falsy-value 
semantics.**
   
   The pre-PR `Connection.set_password` was a no-op when `value` was falsy. The 
mixin version here adds an `else` branch that resets `self._password = None` 
and `self.is_encrypted = False`. Any caller flowing `password=None` (typical 
for a PATCH with an update mask that omits the password, or a sparse update 
path that explicitly sends `null`) now wipes the existing encrypted password 
instead of leaving it alone.
   
   Either restore the no-op semantics (return early when `not value`), or treat 
this as a behavior change with a newsfragment and confirm every PATCH/PUT path 
in the connections API filters `None` before calling the setter.
   
   A unit test that round-trips `conn.set_password("secret"); 
conn.set_password(None); assert conn.password == "secret"` would lock this in.



##########
task-sdk/src/airflow/sdk/execution_time/connection_test_supervisor.py:
##########
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Supervised execution of TestConnection workloads."""
+
+from __future__ import annotations
+
+import uuid
+
+import structlog
+
+from airflow.sdk.api.client import Client
+from airflow.sdk.api.datamodels._generated import ConnectionTestState
+from airflow.sdk.definitions.connection import Connection as SDKConnection
+from airflow.sdk.exceptions import AirflowTaskTimeout
+from airflow.sdk.execution_time.timeout import TimeoutPosix
+
+__all__ = ["supervise_connection_test"]
+
+log = structlog.get_logger(logger_name="connection_test_supervisor")
+
+
+def supervise_connection_test(
+    *,
+    connection_test_id: uuid.UUID,
+    connection_id: str,
+    timeout: int,
+    token: str,
+    server: str,
+) -> int:
+    """Execute a connection test on the worker and report the result via the 
Execution API."""
+    client = Client(base_url=server, token=token)
+
+    try:
+        r = client.connection_tests.get_connection(connection_test_id)
+
+        conn = SDKConnection(
+            conn_id=r.conn_id,
+            conn_type=r.conn_type,
+            host=r.host,
+            login=r.login,
+            password=r.password,
+            schema=r.schema_,
+            port=r.port,
+            extra=r.extra,
+        )
+        with TimeoutPosix(
+            seconds=timeout,
+            error_message=f"Connection test timed out after {timeout}s",
+        ):
+            success, message = conn.test_connection()

Review Comment:
   **`_preset_connections` ContextVar only intercepts the `_get_connection` 
path; hooks reading env vars or secrets backends bypass it.**
   
   This short-circuits `_get_connection(conn_id)` from the preset, which works 
as long as `hook.test_connection()` resolves credentials through 
`BaseHook.get_connection`. But several real-world hooks resolve their own 
`conn_id` differently:
   
   - SSH/AWS hooks often read `AIRFLOW_CONN_<ID>` directly in `__init__` and 
store the resolved fields on the hook instance.
   - Anything that goes through `BaseHook.get_connection_from_secrets` hits the 
configured secrets backend (HashiCorp Vault, AWS Secrets Manager, etc.) before 
falling back to the metadata DB.
   
   In both cases the preset is silently ignored and `test_connection()` ends up 
testing whatever the real env/backend says, which can be a different credential 
(or none at all). The previous in-process implementation set 
`os.environ[conn_env_var] = conn.get_uri()` exactly to defend against this.
   
   Please verify with at least one hook in each class (SSH or AWS for the 
env-var case, any provider that resolves via secrets backend for the second), 
and if they pick the wrong creds, add an env-var override in the worker 
supervisor before invoking the hook.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to