Copilot commented on code in PR #62343:
URL: https://github.com/apache/airflow/pull/62343#discussion_r3025380683


##########
airflow-core/src/airflow/migrations/versions/0111_3_2_0_add_connection_test_table.py:
##########
@@ -0,0 +1,77 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add connection_test_request table for async connection testing.
+
+Revision ID: a7e6d4c3b2f1
+Revises: a4c2d171ae18
+Create Date: 2026-02-22 00:00:00.000000
+
+"""
+
+from __future__ import annotations
+
+import sqlalchemy as sa
+from alembic import op
+
+from airflow.utils.sqlalchemy import UtcDateTime
+
+# revision identifiers, used by Alembic.
+revision = "a7e6d4c3b2f1"
+down_revision = "a4c2d171ae18"
+branch_labels = None
+depends_on = None
+airflow_version = "3.2.0"

Review Comment:
   This migration is chained after revision `a4c2d171ae18` (which is marked as 
`airflow_version = "3.3.0"` in `0110_3_3_0_xcom_dag_result.py`), but this file 
sets `airflow_version = "3.2.0"`. That version label (and the file name 
`0111_3_2_0_...`) conflicts with the actual alembic chain and results in 
`3.2.0` appearing *after* `3.3.0` in the revision-head mapping/docs. Please 
align `airflow_version` (and the migration filename if needed) with the next 
version after `3.3.0`, or adjust the `down_revision` if this truly belongs in 
the `3.2.0` chain.
   ```suggestion
   airflow_version = "3.4.0"
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py:
##########
@@ -257,6 +286,83 @@ def test_connection(test_body: ConnectionBody) -> 
ConnectionTestResponse:
         os.environ.pop(conn_env_var, None)
 
 
+@connections_router.post(
+    "/test-async",
+    status_code=status.HTTP_202_ACCEPTED,
+    responses=create_openapi_http_exception_doc([status.HTTP_403_FORBIDDEN, 
status.HTTP_409_CONFLICT]),
+    dependencies=[Depends(requires_access_connection(method="POST")), 
Depends(action_logging())],
+)
+def test_connection_async(
+    test_body: ConnectionTestRequestBody,
+    session: SessionDep,
+) -> ConnectionTestQueuedResponse:
+    """
+    Queue an async connection test to be executed on a worker.
+
+    The connection data is stored in the test request table and the worker
+    reads from there. Returns a token to poll for the result via
+    GET /connections/test-async/{token}.
+    """
+    _ensure_test_connection_enabled()
+
+    # Only one active test per connection_id at a time.
+    _check_no_active_test(test_body.connection_id, session)
+
+    connection_test = ConnectionTestRequest(
+        connection_id=test_body.connection_id,
+        conn_type=test_body.conn_type,
+        host=test_body.host,
+        login=test_body.login,
+        password=test_body.password,
+        schema=test_body.schema_,
+        port=test_body.port,
+        extra=test_body.extra,
+        commit_on_success=test_body.commit_on_success,
+        executor=test_body.executor,
+        queue=test_body.queue,
+    )
+    session.add(connection_test)
+    session.flush()

Review Comment:
   The "only one active test per connection_id" enforcement is a non-atomic 
check-then-insert (`_check_no_active_test()` followed by 
`session.add/flush()`). Two concurrent POSTs can both pass the check and create 
multiple active rows for the same `connection_id`, contradicting the API 
contract and making later 409 handling unreliable. Consider enforcing this at 
the DB level (unique constraint/index) or using a transactional locking 
strategy so the check and insert are serialized for a given `connection_id`.



##########
airflow-core/src/airflow/utils/db.py:
##########
@@ -115,7 +115,7 @@ class MappedClassProtocol(Protocol):
     "3.0.3": "fe199e1abd77",
     "3.1.0": "cc92b33c6709",
     "3.1.8": "509b94a1042d",
-    "3.2.0": "1d6611b6ab7c",
+    "3.2.0": "a7e6d4c3b2f1",
     "3.3.0": "a4c2d171ae18",

Review Comment:
   The `_REVISION_HEADS_MAP` entry for `3.2.0` now points to revision 
`a7e6d4c3b2f1`, but that revision’s migration file declares `down_revision = 
"a4c2d171ae18"` (a `3.3.0` migration). This makes the mapping non-monotonic 
(`3.2.0` > `3.3.0`) and will break logic that relies on version→head ordering. 
The new migration should be mapped to the *next* Airflow version after `3.3.0` 
(and `3.2.0` should continue to map to `1d6611b6ab7c`).
   ```suggestion
       "3.2.0": "1d6611b6ab7c",
       "3.3.0": "a4c2d171ae18",
       "3.4.0": "a7e6d4c3b2f1",
   ```



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/connections.py:
##########
@@ -257,6 +286,83 @@ def test_connection(test_body: ConnectionBody) -> 
ConnectionTestResponse:
         os.environ.pop(conn_env_var, None)
 
 
+@connections_router.post(
+    "/test-async",
+    status_code=status.HTTP_202_ACCEPTED,
+    responses=create_openapi_http_exception_doc([status.HTTP_403_FORBIDDEN, 
status.HTTP_409_CONFLICT]),
+    dependencies=[Depends(requires_access_connection(method="POST")), 
Depends(action_logging())],
+)
+def test_connection_async(
+    test_body: ConnectionTestRequestBody,
+    session: SessionDep,
+) -> ConnectionTestQueuedResponse:
+    """
+    Queue an async connection test to be executed on a worker.
+
+    The connection data is stored in the test request table and the worker
+    reads from there. Returns a token to poll for the result via
+    GET /connections/test-async/{token}.
+    """
+    _ensure_test_connection_enabled()
+
+    # Only one active test per connection_id at a time.
+    _check_no_active_test(test_body.connection_id, session)
+
+    connection_test = ConnectionTestRequest(
+        connection_id=test_body.connection_id,
+        conn_type=test_body.conn_type,
+        host=test_body.host,
+        login=test_body.login,
+        password=test_body.password,
+        schema=test_body.schema_,
+        port=test_body.port,
+        extra=test_body.extra,
+        commit_on_success=test_body.commit_on_success,
+        executor=test_body.executor,
+        queue=test_body.queue,
+    )
+    session.add(connection_test)
+    session.flush()
+
+    return ConnectionTestQueuedResponse(
+        token=connection_test.token,
+        connection_id=connection_test.connection_id,
+        state=connection_test.state,
+    )
+
+
+@connections_router.get(
+    "/test-async/{connection_test_token}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_connection(method="GET"))],
+)
+def get_connection_test(
+    connection_test_token: str,
+    session: SessionDep,
+) -> ConnectionTestStatusResponse:
+    """
+    Poll for the status of an async connection test.
+
+    Knowledge of the token serves as authorization — only the client
+    that initiated the test knows the crypto-random token.

Review Comment:
   The docstring claims that "knowledge of the token serves as authorization", 
but the route also enforces `requires_access_connection(method="GET")` (and 
therefore still requires normal authz). This is misleading for API consumers 
and could encourage unsafe assumptions. Please reword to clarify that the token 
is an identifier/capability for the test result, while the endpoint still 
requires authenticated/authorized access.
   ```suggestion
       The `connection_test_token` is a crypto-random identifier used to look up
       the specific test result. Access to this endpoint is still governed by
       standard authentication and connection-level authorization
       (see `requires_access_connection(method="GET")`).
   ```



##########
airflow-core/src/airflow/executors/base_executor.py:
##########
@@ -305,10 +315,24 @@ def heartbeat(self) -> None:
         self._emit_metrics(open_slots, num_running_workloads, 
num_queued_workloads)
         self.trigger_tasks(open_slots)
 
+        self.trigger_connection_tests()
+
         # Calling child class sync method
         self.log.debug("Calling the %s sync method", self.__class__)
         self.sync()
 
+    def trigger_connection_tests(self) -> None:
+        """Process queued connection tests, respecting available slot 
capacity."""
+        if not self.supports_connection_test or not 
self.queued_connection_tests:
+            return
+
+        available = self.slots_available
+        if available <= 0:
+            return
+
+        tests_to_run = list(self.queued_connection_tests.values())[:available]
+        self._process_workloads(tests_to_run)

Review Comment:
   `trigger_connection_tests()` uses `self.slots_available` to decide how many 
queued tests to run. Since `slots_available` subtracts 
`len(self.queued_connection_tests)`, this can deadlock when the queue reaches 
`parallelism` (available becomes 0, so nothing is ever processed). It can also 
under-schedule connection tests whenever tasks/callbacks are queued, because 
queued items reduce `slots_available` even though they still need to be 
executed. Compute capacity based on `parallelism - len(self.running)` (like 
`heartbeat()` does for tasks) and schedule up to the remaining open slots after 
`trigger_tasks()` has run.



##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -3122,6 +3136,86 @@ def _activate_assets_generate_warnings() -> 
Iterator[tuple[str, str]]:
             session.add(warning)
             existing_warned_dag_ids.add(warning.dag_id)
 
+    def _enqueue_connection_tests(self, *, session: Session) -> None:
+        """Enqueue pending connection tests to executors that support them."""
+        max_concurrency = conf.getint("core", 
"max_connection_test_concurrency", fallback=4)
+        timeout = conf.getint("core", "connection_test_timeout", fallback=60)
+
+        num_occupied_slots = sum(executor.slots_occupied for executor in 
self.executors)
+        parallelism_budget = conf.getint("core", "parallelism") - 
num_occupied_slots
+        if parallelism_budget <= 0:
+            return
+
+        active_count = session.scalar(
+            select(func.count(ConnectionTestRequest.id)).where(
+                ConnectionTestRequest.state.in_(DISPATCHED_STATES)
+            )
+        )
+        concurrency_budget = max_concurrency - (active_count or 0)
+        budget = min(concurrency_budget, parallelism_budget)
+        if budget <= 0:
+            return
+
+        pending_stmt = (
+            select(ConnectionTestRequest)
+            .where(ConnectionTestRequest.state == ConnectionTestState.PENDING)
+            .order_by(ConnectionTestRequest.created_at)
+            .limit(budget)
+        )
+        pending_stmt = with_row_locks(pending_stmt, session, 
of=ConnectionTestRequest, skip_locked=True)
+        pending_tests = session.scalars(pending_stmt).all()
+
+        if not pending_tests:
+            return
+
+        for ct in pending_tests:
+            executor = self._try_to_load_executor(ct, session)
+            if executor is not None and not executor.supports_connection_test:
+                executor = None
+            if executor is None:
+                reason = (
+                    f"No executor matches '{ct.executor}'"
+                    if ct.executor
+                    else "No executor supports connection testing"
+                )
+                ct.state = ConnectionTestState.FAILED
+                ct.result_message = reason
+                self.log.warning("Failing connection test %s: %s", ct.id, 
reason)
+                continue
+
+            workload = workloads.TestConnection.make(
+                connection_test_id=ct.id,
+                connection_id=ct.connection_id,
+                timeout=timeout,
+                queue=ct.queue,
+                generator=executor.jwt_generator,
+            )
+            executor.queue_workload(workload, session=session)
+            ct.state = ConnectionTestState.QUEUED
+
+        session.flush()
+
+    @provide_session
+    def _reap_stale_connection_tests(self, *, session: Session = NEW_SESSION) 
-> None:
+        """Mark connection tests that have exceeded their timeout as FAILED."""
+        timeout = conf.getint("core", "connection_test_timeout", fallback=60)
+        grace_period = max(30, timeout // 2)
+        cutoff = timezone.utcnow() - timedelta(seconds=timeout + grace_period)
+
+        stale_stmt = select(ConnectionTestRequest).where(
+            ConnectionTestRequest.state.in_(ACTIVE_STATES),

Review Comment:
   `_reap_stale_connection_tests()` reaps rows in `ACTIVE_STATES`, which 
currently includes `PENDING`. This means tests that are merely waiting for 
capacity (never dispatched) can be marked FAILED after `timeout+grace`, 
contradicting the doc/config text that targets stuck `QUEUED`/`RUNNING` tests 
and potentially causing false failures under load. Consider reaping only 
`DISPATCHED_STATES` (or at least excluding `PENDING`).
   ```suggestion
               ConnectionTestRequest.state.in_(DISPATCHED_STATES),
   ```



##########
task-sdk/src/airflow/sdk/api/client.py:
##########
@@ -867,6 +869,25 @@ def get_detail_response(self, ti_id: uuid.UUID) -> 
HITLDetailResponse:
         return HITLDetailResponse.model_validate_json(resp.read())
 
 
+class ConnectionTestOperations:
+    __slots__ = ("client",)
+
+    def __init__(self, client: Client):
+        self.client = client
+
+    def get_connection(self, connection_test_id: uuid.UUID) -> 
ConnectionResponse:
+        """Fetch connection data for a test request from the API server."""
+        resp = 
self.client.get(f"connection-tests/{connection_test_id}/connection")
+        return ConnectionResponse.model_validate_json(resp.read())
+

Review Comment:
   `ConnectionTestOperations.get_connection()` is typed/validated as 
`ConnectionResponse`, but the execution API endpoint returns 
`ConnectionTestConnectionResponse` (and the SDK already generated that model). 
Using the dedicated model here would better reflect the endpoint contract and 
avoid accidental coupling if `ConnectionResponse` later diverges (e.g., 
additional runtime-only fields).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to