This is an automated email from the ASF dual-hosted git repository.

bugraoz93 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a0be5d8c5f8 [AIP-94] Create a CLI airflowctl client and adopt it in 
existing commands (#68175)
a0be5d8c5f8 is described below

commit a0be5d8c5f83a37a8ab7b9d3bf3c88ac636c19aa
Author: Bugra Ozturk <[email protected]>
AuthorDate: Thu Jun 11 19:08:29 2026 +0200

    [AIP-94] Create a CLI airflowctl client and adopt it in existing commands 
(#68175)
    
    * Route airflow dags/pools/assets CLI commands through the API server via 
the airflowctl client
    
    * Add auth-manager get_cli_user tests and tidy CLI migration newsfragment
    
    * Rename significant with PR number
    
    * Add deprecation warnings with airflowctl command replacement
    
    * Fix failure when the auth manager isn't initialized
    
    * Clarify deprecation wording for migrated airflow CLI commands
    
    * Fix airflow assets list with watchers and CLI migration test failures
---
 airflow-core/newsfragments/68175.significant.rst   |  24 ++
 airflow-core/src/airflow/api/client/__init__.py    |  26 --
 .../src/airflow/api/client/local_client.py         | 107 ------
 .../api_fastapi/auth/managers/base_auth_manager.py |  16 +
 .../auth/managers/simple/simple_auth_manager.py    |   3 +
 airflow-core/src/airflow/cli/api_client.py         | 129 ++++++++
 .../src/airflow/cli/commands/asset_command.py      |  66 ++--
 .../src/airflow/cli/commands/dag_command.py        |  60 ++--
 .../src/airflow/cli/commands/pool_command.py       | 100 +++---
 airflow-core/src/airflow/cli/utils.py              |  36 +-
 .../tests/unit/cli/commands/test_asset_command.py  | 144 +++-----
 .../unit/cli/commands/test_command_deprecations.py |  72 ++++
 .../tests/unit/cli/commands/test_dag_command.py    | 364 +++++++++------------
 .../tests/unit/cli/commands/test_pool_command.py   | 350 +++++++++-----------
 airflow-core/tests/unit/cli/conftest.py            |  20 ++
 airflow-core/tests/unit/cli/test_api_client.py     | 140 ++++++++
 airflow-core/tests/unit/cli/test_utils.py          |  48 +++
 .../providers/fab/auth_manager/fab_auth_manager.py |  24 ++
 .../unit/fab/auth_manager/test_fab_auth_manager.py |  26 ++
 .../keycloak/auth_manager/keycloak_auth_manager.py |  30 +-
 .../auth_manager/test_keycloak_auth_manager.py     |  21 +-
 scripts/ci/prek/known_airflow_exceptions.txt       |   2 +-
 22 files changed, 1053 insertions(+), 755 deletions(-)

diff --git a/airflow-core/newsfragments/68175.significant.rst 
b/airflow-core/newsfragments/68175.significant.rst
new file mode 100644
index 00000000000..4c9f7b4dffc
--- /dev/null
+++ b/airflow-core/newsfragments/68175.significant.rst
@@ -0,0 +1,24 @@
+Airflow CLI commands are moving to talk to the API server
+
+The CLI is being migrated to reach Airflow through the API server (via the 
``airflowctl``
+client) instead of the metadata database directly. Migrated so far: ``dags 
trigger``,
+``dags delete``, ``pools`` (list/get/set/delete/import/export), and ``assets 
materialize``;
+this fragment is updated as more commands migrate rather than adding new ones.
+
+These commands now require a reachable API server and mint a short-lived token 
in memory
+(set ``AIRFLOW_CLI_TOKEN`` for auth managers that cannot mint locally, or for 
remote servers).
+``airflow.api.client`` is removed — use 
``airflow.cli.api_client.get_cli_api_client``.
+
+Each migrated command emits a ``RemovedInAirflow4Warning`` and will be removed 
in a future
+Airflow release; use the equivalent ``airflowctl`` command instead.
+
+* Types of change
+
+  * [ ] Dag changes
+  * [ ] Config changes
+  * [ ] API changes
+  * [ ] CLI changes
+  * [x] Behaviour changes
+  * [ ] Plugin changes
+  * [ ] Dependency changes
+  * [x] Code interface changes
diff --git a/airflow-core/src/airflow/api/client/__init__.py 
b/airflow-core/src/airflow/api/client/__init__.py
deleted file mode 100644
index f0d236b9019..00000000000
--- a/airflow-core/src/airflow/api/client/__init__.py
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""API Client that allows interacting with Airflow API."""
-
-from __future__ import annotations
-
-from airflow.api.client.local_client import Client
-
-
-def get_current_api_client() -> Client:
-    return Client()
diff --git a/airflow-core/src/airflow/api/client/local_client.py 
b/airflow-core/src/airflow/api/client/local_client.py
deleted file mode 100644
index 057d6d99c7c..00000000000
--- a/airflow-core/src/airflow/api/client/local_client.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-"""Local client API."""
-
-from __future__ import annotations
-
-import httpx
-
-from airflow.api.common import delete_dag, trigger_dag
-from airflow.exceptions import AirflowBadRequest, PoolNotFound
-from airflow.models.pool import Pool
-from airflow.utils.types import DagRunTriggeredByType
-
-
-class Client:
-    """Local API client implementation."""
-
-    def __init__(self, auth=None, session: httpx.Client | None = None):
-        self._session: httpx.Client = session or httpx.Client()
-        if auth:
-            self._session.auth = auth
-
-    def trigger_dag(
-        self,
-        dag_id,
-        run_id=None,
-        conf=None,
-        logical_date=None,
-        triggering_user_name=None,
-        replace_microseconds=True,
-    ) -> dict | None:
-        dag_run = trigger_dag.trigger_dag(
-            dag_id=dag_id,
-            triggered_by=DagRunTriggeredByType.CLI,
-            triggering_user_name=triggering_user_name,
-            run_id=run_id,
-            conf=conf,
-            logical_date=logical_date,
-            replace_microseconds=replace_microseconds,
-        )
-        if dag_run:
-            return {
-                "conf": dag_run.conf,
-                "dag_id": dag_run.dag_id,
-                "dag_run_id": dag_run.run_id,
-                "data_interval_start": dag_run.data_interval_start,
-                "data_interval_end": dag_run.data_interval_end,
-                "end_date": dag_run.end_date,
-                "last_scheduling_decision": dag_run.last_scheduling_decision,
-                "logical_date": dag_run.logical_date,
-                "run_type": dag_run.run_type,
-                "start_date": dag_run.start_date,
-                "state": dag_run.state,
-                "triggering_user_name": dag_run.triggering_user_name,
-            }
-        return dag_run
-
-    def delete_dag(self, dag_id):
-        count = delete_dag.delete_dag(dag_id)
-        return f"Removed {count} record(s)"
-
-    def get_pool(self, name):
-        pool = Pool.get_pool(pool_name=name)
-        if not pool:
-            raise PoolNotFound(f"Pool {name} not found")
-        return pool.pool, pool.slots, pool.description, pool.include_deferred, 
pool.team_name
-
-    def get_pools(self):
-        return [(p.pool, p.slots, p.description, p.include_deferred, 
p.team_name) for p in Pool.get_pools()]
-
-    def create_pool(self, name, slots, description, include_deferred, 
team_name=None):
-        if not (name and name.strip()):
-            raise AirflowBadRequest("Pool name shouldn't be empty")
-        pool_name_length = Pool.pool.property.columns[0].type.length
-        if len(name) > pool_name_length:
-            raise AirflowBadRequest(f"Pool name cannot be more than 
{pool_name_length} characters")
-        try:
-            slots = int(slots)
-        except ValueError:
-            raise AirflowBadRequest(f"Invalid value for `slots`: {slots}")
-        pool = Pool.create_or_update_pool(
-            name=name,
-            slots=slots,
-            description=description,
-            include_deferred=include_deferred,
-            team_name=team_name,
-        )
-        return pool.pool, pool.slots, pool.description, pool.team_name
-
-    def delete_pool(self, name):
-        pool = Pool.delete_pool(name=name)
-        return pool.pool, pool.slots, pool.description
diff --git 
a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py 
b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
index 4ba08e5b447..7f62d8af27c 100644
--- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
+++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py
@@ -180,6 +180,22 @@ class BaseAuthManager(Generic[T], LoggingMixin, 
metaclass=ABCMeta):
             self.serialize_user(user)
         )
 
+    def get_cli_user(self) -> T:
+        """
+        Return the user the local CLI acts as when calling the API server.
+
+        The Airflow CLI mints a short-lived JWT for this user (via 
:meth:`generate_jwt`)
+        so it can talk to the API server without persisting any credentials. A 
generic
+        auth manager cannot know which user is authorized for local CLI 
access, so the
+        default raises. Auth managers that support local CLI usage should 
override this
+        to return an administrative user. Otherwise, operators must provide a 
token via
+        the ``AIRFLOW_CLI_TOKEN`` environment variable.
+        """
+        raise NotImplementedError(
+            f"{type(self).__name__} does not support minting a local CLI 
token. "
+            "Set the AIRFLOW_CLI_TOKEN environment variable with a valid API 
token instead."
+        )
+
     @abstractmethod
     def get_url_login(self, **kwargs) -> str:
         """Return the login page url."""
diff --git 
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
 
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
index 0559a388156..0deaadf4034 100644
--- 
a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
+++ 
b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py
@@ -238,6 +238,9 @@ class 
SimpleAuthManager(BaseAuthManager[SimpleAuthManagerUser]):
     def serialize_user(self, user: SimpleAuthManagerUser) -> dict[str, Any]:
         return {"sub": user.username, "role": user.role, "teams": user.teams}
 
+    def get_cli_user(self) -> SimpleAuthManagerUser:
+        return SimpleAuthManagerUser(username="cli", 
role=SimpleAuthManagerRole.ADMIN.name)
+
     def is_authorized_configuration(
         self,
         *,
diff --git a/airflow-core/src/airflow/cli/api_client.py 
b/airflow-core/src/airflow/cli/api_client.py
new file mode 100644
index 00000000000..d1ff5e2ddd9
--- /dev/null
+++ b/airflow-core/src/airflow/cli/api_client.py
@@ -0,0 +1,129 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Provide the :mod:`airflowctl` HTTP API client to the local Airflow CLI.
+
+The local CLI talks to the API server through the same typed client that 
``airflowctl``
+uses, but without the keyring-backed credential store. For each invocation it 
mints a
+short-lived JWT **in memory** (via the active auth manager) and builds a 
client with it;
+nothing is persisted. Set the ``AIRFLOW_CLI_TOKEN`` environment variable to 
supply a token
+explicitly (required for auth managers whose tokens cannot be minted locally, 
such as
+Keycloak, or when targeting a remote API server).
+"""
+
+from __future__ import annotations
+
+import atexit
+import os
+from collections.abc import Callable
+from functools import wraps
+from typing import TYPE_CHECKING, TypeVar
+
+import httpx
+
+# Re-exported so command modules import the client surface from a single place.
+from airflowctl.api.client import NEW_API_CLIENT, Client, ClientKind
+
+from airflow.configuration import conf
+from airflow.typing_compat import ParamSpec
+
+if TYPE_CHECKING:
+    from airflow.api_fastapi.auth.managers.base_auth_manager import 
BaseAuthManager
+
+__all__ = [
+    "NEW_API_CLIENT",
+    "Client",
+    "ClientKind",
+    "get_cli_api_client",
+    "provide_api_client",
+]
+
+PS = ParamSpec("PS")
+RT = TypeVar("RT")
+
+# Validity of the in-memory CLI token. It only needs to outlive a single CLI 
command
+# (including the client's request retries) and is never persisted or logged.
+_CLI_TOKEN_VALID_FOR_SECONDS = 300
+
+_api_client: Client | None = None
+
+
+def _resolve_base_url() -> str:
+    """Resolve the API server base URL from configuration."""
+    base_url = conf.get("api", "base_url", fallback=None)
+    if base_url:
+        return base_url
+    host = conf.get("api", "host", fallback="localhost") or "localhost"
+    port = conf.get("api", "port", fallback="8080") or "8080"
+    return f"http://{host}:{port}";
+
+
+def _mint_cli_token() -> str:
+    """
+    Return a token for the CLI to authenticate against the API server.
+
+    Prefers an explicit ``AIRFLOW_CLI_TOKEN`` (the universal override), 
otherwise mints a
+    short-lived JWT through the active auth manager. The token lives only in 
this process.
+    """
+    if token := os.environ.get("AIRFLOW_CLI_TOKEN"):
+        return token
+
+    from airflow.api_fastapi.app import get_auth_manager, init_auth_manager
+
+    # The CLI runs outside the API server, so the auth manager singleton is 
usually not
+    # initialized yet; initialize it on demand. ``init_auth_manager`` reuses 
the cached
+    # instance when one already exists, so this is safe to call here.
+    try:
+        auth_manager: BaseAuthManager = get_auth_manager()
+    except RuntimeError:
+        auth_manager = init_auth_manager()
+    return auth_manager.generate_jwt(
+        auth_manager.get_cli_user(),
+        expiration_time_in_seconds=_CLI_TOKEN_VALID_FOR_SECONDS,
+    )
+
+
+def get_cli_api_client() -> Client:
+    """Return the process-wide singleton airflowctl client for the local 
CLI."""
+    global _api_client
+    if _api_client is None:
+        _api_client = Client(
+            base_url=_resolve_base_url(),
+            token=_mint_cli_token(),
+            kind=ClientKind.CLI,
+            limits=httpx.Limits(max_keepalive_connections=1, 
max_connections=1),
+        )
+        atexit.register(_api_client.close)
+    return _api_client
+
+
+def provide_api_client(func: Callable[PS, RT]) -> Callable[PS, RT]:
+    """
+    Provide the CLI API client to the decorated command function.
+
+    Injects ``api_client=get_cli_api_client()`` when the caller does not pass 
one. Tests
+    (or callers that already hold a client) pass ``api_client=`` explicitly to 
bypass it.
+    """
+
+    @wraps(func)
+    def wrapper(*args, **kwargs) -> RT:
+        if "api_client" not in kwargs:
+            kwargs["api_client"] = get_cli_api_client()
+        return func(*args, **kwargs)
+
+    return wrapper
diff --git a/airflow-core/src/airflow/cli/commands/asset_command.py 
b/airflow-core/src/airflow/cli/commands/asset_command.py
index 15430b76488..29c1025958a 100644
--- a/airflow-core/src/airflow/cli/commands/asset_command.py
+++ b/airflow-core/src/airflow/cli/commands/asset_command.py
@@ -17,21 +17,17 @@
 
 from __future__ import annotations
 
-import logging
 import typing
 
 from sqlalchemy import select
 
-from airflow.api.common.trigger_dag import trigger_dag
 from airflow.api_fastapi.core_api.datamodels.assets import AssetAliasResponse, 
AssetResponse
-from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse
+from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client
 from airflow.cli.simple_table import AirflowConsole
-from airflow.exceptions import AirflowConfigException
-from airflow.models.asset import AssetAliasModel, AssetModel, 
TaskOutletAssetReference
+from airflow.cli.utils import deprecated_for_airflowctl
+from airflow.models.asset import AssetAliasModel, AssetModel
 from airflow.utils import cli as cli_utils
-from airflow.utils.platform import getuser
 from airflow.utils.session import NEW_SESSION, provide_session
-from airflow.utils.types import DagRunTriggeredByType, DagRunType
 
 if typing.TYPE_CHECKING:
     from typing import Any
@@ -40,8 +36,6 @@ if typing.TYPE_CHECKING:
 
     from airflow.api_fastapi.core_api.base import BaseModel
 
-log = logging.getLogger(__name__)
-
 
 def _list_asset_aliases(args, *, session: Session) -> tuple[Any, 
type[BaseModel]]:
     aliases = 
session.scalars(select(AssetAliasModel).order_by(AssetAliasModel.name))
@@ -49,7 +43,13 @@ def _list_asset_aliases(args, *, session: Session) -> 
tuple[Any, type[BaseModel]
 
 
 def _list_assets(args, *, session: Session) -> tuple[Any, type[BaseModel]]:
-    assets = session.scalars(select(AssetModel).order_by(AssetModel.name))
+    assets = 
session.scalars(select(AssetModel).order_by(AssetModel.name)).all()
+    for asset in assets:
+        for watcher in asset.watchers:
+            # ``AssetWatcherModel`` has no ``created_date`` column; like the 
public API
+            # serializer, derive it from the watcher's trigger so 
``AssetResponse`` validation
+            # succeeds. Set on the instance so ``model_validate`` reads it via 
``from_attributes``.
+            watcher.created_date = watcher.trigger.created_date
     return assets, AssetResponse
 
 
@@ -124,48 +124,38 @@ def asset_details(args, *, session: Session = 
NEW_SESSION) -> None:
 
 
 @cli_utils.action_cli
-@provide_session
-def asset_materialize(args, *, session: Session = NEW_SESSION) -> None:
+@deprecated_for_airflowctl("airflowctl assets materialize")
+@provide_api_client
+def asset_materialize(args, api_client: Client = NEW_API_CLIENT) -> None:
     """
     Materialize the specified asset.
 
     This is done by finding the DAG with the asset defined as outlet, and 
create
-    a run for that DAG.
+    a run for that DAG. Resolving the DAG and creating the run is handled by 
the API
+    server; the asset is identified here by its name and/or URI.
     """
     if not args.name and not args.uri:
         raise SystemExit("Either --name or --uri is required")
 
-    stmt = 
select(TaskOutletAssetReference.dag_id).join(TaskOutletAssetReference.asset)
     select_message_parts = []
     if args.name:
-        stmt = stmt.where(AssetModel.name == args.name)
         select_message_parts.append(f"name {args.name}")
     if args.uri:
-        stmt = stmt.where(AssetModel.uri == args.uri)
         select_message_parts.append(f"URI {args.uri}")
-    dag_id_it = 
iter(session.scalars(stmt.group_by(TaskOutletAssetReference.dag_id).limit(2)))
     select_message = " and ".join(select_message_parts)
 
-    if (dag_id := next(dag_id_it, None)) is None:
+    matches = [
+        asset
+        for asset in api_client.assets.list().assets
+        if (not args.name or asset.name == args.name) and (not args.uri or 
asset.uri == args.uri)
+    ]
+    if not matches:
         raise SystemExit(f"Asset with {select_message} does not exist.")
-    if next(dag_id_it, None) is not None:
-        raise SystemExit(f"More than one DAG materializes asset with 
{select_message}.")
-
-    try:
-        user = getuser()
-    except AirflowConfigException as e:
-        log.warning("Failed to get user name from os: %s, not setting the 
triggering user", e)
-        user = None
-    dagrun = trigger_dag(
-        dag_id=dag_id,
-        triggered_by=DagRunTriggeredByType.CLI,
-        run_type=DagRunType.ASSET_MATERIALIZATION,
-        triggering_user_name=user,
-        session=session,
-    )
-    if dagrun is not None:
-        data = [DAGRunResponse.model_validate(dagrun).model_dump(mode="json")]
-    else:
-        data = []
+    if len(matches) > 1:
+        raise SystemExit(f"More than one asset exists with {select_message}.")
 
-    AirflowConsole().print_as(data=data, output=args.output)
+    dag_run = api_client.assets.materialize(asset_id=str(matches[0].id))
+    AirflowConsole().print_as(
+        data=[dag_run.model_dump(mode="json")],
+        output=args.output,
+    )
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py 
b/airflow-core/src/airflow/cli/commands/dag_command.py
index d9d8388055a..1d5e4237576 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -33,14 +33,15 @@ from typing import TYPE_CHECKING, cast
 from sqlalchemy import func, select
 
 from airflow._shared.timezones import timezone
-from airflow.api.client import get_current_api_client
+from airflow.api_fastapi.core_api.datamodels.dag_run import 
TriggerDAGRunPostBody
 from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
+from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client
 from airflow.cli.simple_table import AirflowConsole
-from airflow.cli.utils import fetch_dag_run_from_run_id_or_logical_date_string
+from airflow.cli.utils import deprecated_for_airflowctl, 
fetch_dag_run_from_run_id_or_logical_date_string
 from airflow.dag_processing.bundles.base import unpack_bundle_version
 from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.dag_processing.dagbag import BundleDagBag, DagBag, sync_bag_to_db
-from airflow.exceptions import AirflowConfigException, AirflowException
+from airflow.exceptions import AirflowException
 from airflow.jobs.job import Job
 from airflow.models import DagModel, DagRun, TaskInstance
 from airflow.models.errors import ParseImportError
@@ -55,7 +56,6 @@ from airflow.utils.cli import (
 )
 from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
 from airflow.utils.helpers import ask_yesno
-from airflow.utils.platform import getuser
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import NEW_SESSION, create_session, provide_session
 from airflow.utils.state import DagRunState
@@ -77,47 +77,41 @@ log = logging.getLogger(__name__)
 
 
 @cli_utils.action_cli
+@deprecated_for_airflowctl("airflowctl dags trigger")
 @providers_configuration_loaded
-def dag_trigger(args) -> None:
+@provide_api_client
+def dag_trigger(args, api_client: Client = NEW_API_CLIENT) -> None:
     """Create a dag run for the specified dag."""
-    api_client = get_current_api_client()
-    try:
-        user = getuser()
-    except AirflowConfigException as e:
-        log.warning("Failed to get user name from os: %s, not setting the 
triggering user", e)
-        user = None
-    try:
-        message = api_client.trigger_dag(
-            dag_id=args.dag_id,
-            run_id=args.run_id,
-            conf=args.conf,
-            logical_date=args.logical_date,
-            triggering_user_name=user,
-            replace_microseconds=args.replace_microseconds,
-        )
-        AirflowConsole().print_as(
-            data=[message] if message is not None else [],
-            output=args.output,
-        )
-    except OSError as err:
-        raise AirflowException(err)
+    run_conf = json.loads(args.conf) if args.conf is not None else None
+    if run_conf is not None and not isinstance(run_conf, dict):
+        raise ValueError("DagRun conf must be a JSON object or null")
+    # The core_api request models are the source of truth; they are 
wire-compatible with
+    # the airflowctl client's generated models (the API server uses 
populate_by_name).
+    trigger_body = TriggerDAGRunPostBody(
+        dag_run_id=args.run_id,
+        conf=run_conf,
+        logical_date=args.logical_date,
+    )
+    dag_run = api_client.dags.trigger(dag_id=args.dag_id, 
trigger_dag_run=trigger_body)  # type: ignore[arg-type]
+    AirflowConsole().print_as(
+        data=[dag_run.model_dump(mode="json")],
+        output=args.output,
+    )
 
 
 @cli_utils.action_cli
+@deprecated_for_airflowctl("airflowctl dags delete")
 @providers_configuration_loaded
-def dag_delete(args) -> None:
+@provide_api_client
+def dag_delete(args, api_client: Client = NEW_API_CLIENT) -> None:
     """Delete all DB records related to the specified dag."""
-    api_client = get_current_api_client()
     if (
         args.yes
         or input("This will drop all existing records related to the specified 
DAG. Proceed? (y/n)").upper()
         == "Y"
     ):
-        try:
-            message = api_client.delete_dag(dag_id=args.dag_id)
-            print(message)
-        except OSError as err:
-            raise AirflowException(err)
+        api_client.dags.delete(dag_id=args.dag_id)
+        print(f"Removed DAG {args.dag_id}")
     else:
         print("Cancelled")
 
diff --git a/airflow-core/src/airflow/cli/commands/pool_command.py 
b/airflow-core/src/airflow/cli/commands/pool_command.py
index c2e624d23a6..a51351f73bd 100644
--- a/airflow-core/src/airflow/cli/commands/pool_command.py
+++ b/airflow-core/src/airflow/cli/commands/pool_command.py
@@ -23,9 +23,12 @@ import json
 import os
 from json import JSONDecodeError
 
-from airflow.api.client import get_current_api_client
+from airflowctl.api.operations import ServerResponseError
+
+from airflow.api_fastapi.core_api.datamodels.pools import PoolBody
+from airflow.cli.api_client import NEW_API_CLIENT, Client, provide_api_client
 from airflow.cli.simple_table import AirflowConsole
-from airflow.exceptions import PoolNotFound
+from airflow.cli.utils import deprecated_for_airflowctl
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import suppress_logs_and_warning
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
@@ -36,66 +39,78 @@ def _show_pools(pools, output):
         data=pools,
         output=output,
         mapper=lambda x: {
-            "pool": x[0],
-            "slots": x[1],
-            "description": x[2],
-            "include_deferred": x[3],
-            "team_name": x[4],
+            "pool": x.name,
+            "slots": x.slots,
+            "description": x.description,
+            "include_deferred": x.include_deferred,
+            "team_name": x.team_name,
         },
     )
 
 
+@deprecated_for_airflowctl("airflowctl pools list")
 @suppress_logs_and_warning
 @providers_configuration_loaded
-def pool_list(args):
+@provide_api_client
+def pool_list(args, api_client: Client = NEW_API_CLIENT):
     """Display info of all the pools."""
-    api_client = get_current_api_client()
-    pools = api_client.get_pools()
+    pools = api_client.pools.list().pools
     _show_pools(pools=pools, output=args.output)
 
 
+@deprecated_for_airflowctl("airflowctl pools get")
 @suppress_logs_and_warning
 @providers_configuration_loaded
-def pool_get(args):
+@provide_api_client
+def pool_get(args, api_client: Client = NEW_API_CLIENT):
     """Display pool info by a given name."""
-    api_client = get_current_api_client()
     try:
-        pools = [api_client.get_pool(name=args.pool)]
+        pools = [api_client.pools.get(pool_name=args.pool)]
         _show_pools(pools=pools, output=args.output)
-    except PoolNotFound:
-        raise SystemExit(f"Pool {args.pool} does not exist")
+    except ServerResponseError as e:
+        if e.response.status_code == 404:
+            raise SystemExit(f"Pool {args.pool} does not exist")
+        raise
 
 
 @cli_utils.action_cli
+@deprecated_for_airflowctl("airflowctl pools create")
 @suppress_logs_and_warning
 @providers_configuration_loaded
-def pool_set(args):
+@provide_api_client
+def pool_set(args, api_client: Client = NEW_API_CLIENT):
     """Create new pool with a given name and slots."""
-    api_client = get_current_api_client()
-    api_client.create_pool(
+    # core_api PoolBody is the source of truth and is wire-compatible with the 
airflowctl
+    # client's generated model (the API server uses populate_by_name).
+    pool_body = PoolBody(
         name=args.pool,
         slots=args.slots,
         description=args.description,
         include_deferred=args.include_deferred,
         team_name=args.team_name,
     )
+    api_client.pools.create(pool=pool_body)  # type: ignore[arg-type]
     print(f"Pool {args.pool} created")
 
 
 @cli_utils.action_cli
+@deprecated_for_airflowctl("airflowctl pools delete")
 @suppress_logs_and_warning
 @providers_configuration_loaded
-def pool_delete(args):
+@provide_api_client
+def pool_delete(args, api_client: Client = NEW_API_CLIENT):
     """Delete pool by a given name."""
-    api_client = get_current_api_client()
     try:
-        api_client.delete_pool(name=args.pool)
+        api_client.pools.delete(pool=args.pool)
         print(f"Pool {args.pool} deleted")
-    except PoolNotFound:
-        raise SystemExit(f"Pool {args.pool} does not exist")
+    except ServerResponseError as e:
+        if e.response.status_code == 404:
+            raise SystemExit(f"Pool {args.pool} does not exist")
+        raise
 
 
 @cli_utils.action_cli
+@deprecated_for_airflowctl("airflowctl pools import")
 @suppress_logs_and_warning
 @providers_configuration_loaded
 def pool_import(args):
@@ -108,6 +123,7 @@ def pool_import(args):
     print(f"Uploaded {len(pools)} pool(s)")
 
 
+@deprecated_for_airflowctl("airflowctl pools export")
 @providers_configuration_loaded
 def pool_export(args):
     """Export all the pools to the file."""
@@ -115,10 +131,9 @@ def pool_export(args):
     print(f"Exported {len(pools)} pools to {args.file}")
 
 
-def pool_import_helper(filepath):
+@provide_api_client
+def pool_import_helper(filepath, api_client: Client = NEW_API_CLIENT):
     """Help import pools from the json file."""
-    api_client = get_current_api_client()
-
     with open(filepath) as poolfile:
         data = poolfile.read()
     try:
@@ -129,34 +144,33 @@ def pool_import_helper(filepath):
     failed = []
     for k, v in pools_json.items():
         if isinstance(v, dict) and "slots" in v and "description" in v:
-            pools.append(
-                api_client.create_pool(
-                    name=k,
-                    slots=v["slots"],
-                    description=v["description"],
-                    include_deferred=v.get("include_deferred", False),
-                    team_name=v.get("team_name"),
-                )
+            pool_body = PoolBody(
+                name=k,
+                slots=v["slots"],
+                description=v["description"],
+                include_deferred=v.get("include_deferred", False),
+                team_name=v.get("team_name"),
             )
+            pools.append(api_client.pools.create(pool=pool_body))  # type: 
ignore[arg-type]
         else:
             failed.append(k)
     return pools, failed
 
 
-def pool_export_helper(filepath):
+@provide_api_client
+def pool_export_helper(filepath, api_client: Client = NEW_API_CLIENT):
     """Help export all the pools to the json file."""
-    api_client = get_current_api_client()
     pool_dict = {}
-    pools = api_client.get_pools()
+    pools = api_client.pools.list().pools
     for pool in pools:
         entry = {
-            "slots": pool[1],
-            "description": pool[2],
-            "include_deferred": pool[3],
+            "slots": pool.slots,
+            "description": pool.description,
+            "include_deferred": pool.include_deferred,
         }
-        if pool[4] is not None:
-            entry["team_name"] = pool[4]
-        pool_dict[pool[0]] = entry
+        if pool.team_name is not None:
+            entry["team_name"] = pool.team_name
+        pool_dict[pool.name] = entry
     with open(filepath, "w") as poolfile:
         poolfile.write(json.dumps(pool_dict, sort_keys=True, indent=4))
     return pools
diff --git a/airflow-core/src/airflow/cli/utils.py 
b/airflow-core/src/airflow/cli/utils.py
index 870f045071b..a19d761c304 100644
--- a/airflow-core/src/airflow/cli/utils.py
+++ b/airflow-core/src/airflow/cli/utils.py
@@ -17,8 +17,13 @@
 
 from __future__ import annotations
 
+import functools
 import sys
-from typing import TYPE_CHECKING
+import warnings
+from collections.abc import Callable
+from typing import TYPE_CHECKING, TypeVar
+
+from airflow.exceptions import RemovedInAirflow4Warning
 
 # Placeholder for masking sensitive values in CLI output
 SENSITIVE_PLACEHOLDER = "***"
@@ -32,6 +37,35 @@ if TYPE_CHECKING:
 
     from airflow.models.dagrun import DagRun
 
+F = TypeVar("F", bound=Callable[..., object])
+
+
+def deprecated_for_airflowctl(replacement: str) -> Callable[[F], F]:
+    """
+    Mark an ``airflow`` CLI command as deprecated in favour of an 
``airflowctl`` equivalent.
+
+    These commands now reach Airflow through the API server via the 
``airflowctl`` client. They
+    are kept for backwards compatibility but will be removed in a future 
Airflow release; users
+    should switch to ``airflowctl`` directly.
+
+    :param replacement: The equivalent ``airflowctl`` command, e.g. 
``airflowctl dags trigger``.
+    """
+
+    def decorator(func: F) -> F:
+        @functools.wraps(func)
+        def wrapper(*args, **kwargs):
+            warnings.warn(
+                f"This `airflow` CLI command is deprecated and will be removed 
in a future "
+                f"Airflow release. Use `{replacement}` instead.",
+                RemovedInAirflow4Warning,
+                stacklevel=2,
+            )
+            return func(*args, **kwargs)
+
+        return wrapper  # type: ignore[return-value]
+
+    return decorator
+
 
 class CliConflictError(Exception):
     """Error for when CLI commands are defined twice by different sources."""
diff --git a/airflow-core/tests/unit/cli/commands/test_asset_command.py 
b/airflow-core/tests/unit/cli/commands/test_asset_command.py
index 6efd8293534..d30e36eb04d 100644
--- a/airflow-core/tests/unit/cli/commands/test_asset_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_asset_command.py
@@ -21,7 +21,7 @@ from __future__ import annotations
 import json
 import os
 import typing
-from unittest import mock
+from types import SimpleNamespace
 
 import pytest
 
@@ -37,7 +37,9 @@ if typing.TYPE_CHECKING:
 pytestmark = [pytest.mark.db_test]
 
 
[email protected](scope="module", autouse=True)
+# Not autouse: only the DB-backed tests below request it, so the mocked 
(non-DB)
+# ``assets materialize`` tests stay free of any database access.
[email protected](scope="module")
 def prepare_examples():
     with conf_vars({("core", "load_examples"): "True"}):
         parse_and_sync_to_db(os.devnull)
@@ -46,17 +48,12 @@ def prepare_examples():
     clear_db_dags()
 
 
[email protected](autouse=True)
-def clear_runs():
-    clear_db_runs()
-
-
 @pytest.fixture(scope="module")
 def parser() -> ArgumentParser:
     return cli_parser.get_parser()
 
 
-def test_cli_assets_list(parser: ArgumentParser, stdout_capture) -> None:
+def test_cli_assets_list(prepare_examples, parser: ArgumentParser, 
stdout_capture) -> None:
     args = parser.parse_args(["assets", "list", "--output=json"])
     with stdout_capture as capture:
         asset_command.asset_list(args)
@@ -67,7 +64,7 @@ def test_cli_assets_list(parser: ArgumentParser, 
stdout_capture) -> None:
     assert any(asset["uri"] == "s3://dag1/output_1.txt" for asset in 
asset_list), asset_list
 
 
-def test_cli_assets_alias_list(parser: ArgumentParser, stdout_capture) -> None:
+def test_cli_assets_alias_list(prepare_examples, parser: ArgumentParser, 
stdout_capture) -> None:
     args = parser.parse_args(["assets", "list", "--alias", "--output=json"])
     with stdout_capture as capture:
         asset_command.asset_list(args)
@@ -78,7 +75,7 @@ def test_cli_assets_alias_list(parser: ArgumentParser, 
stdout_capture) -> None:
     assert any(alias["name"] == "example-alias" for alias in alias_list), 
alias_list
 
 
-def test_cli_assets_details(parser: ArgumentParser, stdout_capture) -> None:
+def test_cli_assets_details(prepare_examples, parser: ArgumentParser, 
stdout_capture) -> None:
     args = parser.parse_args(["assets", "details", "--name=asset1_producer", 
"--output=json"])
     with stdout_capture as capture:
         asset_command.asset_details(args)
@@ -107,7 +104,7 @@ def test_cli_assets_details(parser: ArgumentParser, 
stdout_capture) -> None:
     }
 
 
-def test_cli_assets_alias_details(parser: ArgumentParser, stdout_capture) -> 
None:
+def test_cli_assets_alias_details(prepare_examples, parser: ArgumentParser, 
stdout_capture) -> None:
     args = parser.parse_args(["assets", "details", "--alias", 
"--name=example-alias", "--output=json"])
     with stdout_capture as capture:
         asset_command.asset_details(args)
@@ -124,85 +121,46 @@ def test_cli_assets_alias_details(parser: ArgumentParser, 
stdout_capture) -> Non
     }
 
 
[email protected]("airflow.api_fastapi.core_api.datamodels.dag_versions.hasattr")
-def test_cli_assets_materialize(mock_hasattr, parser: ArgumentParser, 
stdout_capture) -> None:
-    mock_hasattr.return_value = False
-    args = parser.parse_args(["assets", "materialize", 
"--name=asset1_producer", "--output=json"])
-    with stdout_capture as capture:
-        asset_command.asset_materialize(args)
-
-    output = capture.getvalue()
-
-    # Check if output is empty first
-    assert output, "No output captured from asset_materialize command"
-
-    run_list = json.loads(output)
-    assert len(run_list) == 1
-
-    # No good way to statically compare these.
-    undeterministic: dict = {
-        "dag_run_id": None,
-        "dag_versions": [],
-        "data_interval_end": None,
-        "data_interval_start": None,
-        "logical_date": None,
-        "queued_at": None,
-        "run_after": "2025-02-12T19:27:59.066046Z",
-    }
-
-    assert run_list[0] | undeterministic == undeterministic | {
-        "conf": {},
-        "bundle_version": None,
-        "dag_display_name": "asset1_producer",
-        "dag_id": "asset1_producer",
-        "end_date": None,
-        "duration": None,
-        "last_scheduling_decision": None,
-        "note": None,
-        "partition_key": None,
-        "run_type": "asset_materialization",
-        "start_date": None,
-        "state": "queued",
-        "triggered_by": "cli",
-        "triggering_user_name": "root",
-        "run_after": "2025-02-12T19:27:59.066046Z",
-    }
-
-
-def test_cli_assets_materialize_with_view_url_template(parser: ArgumentParser, 
stdout_capture) -> None:
-    args = parser.parse_args(["assets", "materialize", 
"--name=asset1_producer", "--output=json"])
-    with stdout_capture as capture:
-        asset_command.asset_materialize(args)
-
-    output = capture.getvalue()
-    run_list = json.loads(output)
-    assert len(run_list) == 1
-
-    # No good way to statically compare these.
-    undeterministic: dict = {
-        "dag_run_id": None,
-        "dag_versions": [],
-        "data_interval_end": None,
-        "data_interval_start": None,
-        "logical_date": None,
-        "queued_at": None,
-        "run_after": "2025-02-12T19:27:59.066046Z",
-    }
-
-    assert run_list[0] | undeterministic == undeterministic | {
-        "conf": {},
-        "bundle_version": None,
-        "dag_display_name": "asset1_producer",
-        "dag_id": "asset1_producer",
-        "end_date": None,
-        "duration": None,
-        "last_scheduling_decision": None,
-        "note": None,
-        "partition_key": None,
-        "run_type": "asset_materialization",
-        "start_date": None,
-        "state": "queued",
-        "triggered_by": "cli",
-        "triggering_user_name": "root",
-        "run_after": "2025-02-12T19:27:59.066046Z",
-    }
[email protected]_db_test_override
+class TestCliAssetsMaterialize:
+    """`assets materialize` goes through the airflowctl client; mocked here 
(no DB/server)."""
+
+    def test_materialize(self, parser: ArgumentParser, mock_cli_api_client, 
stdout_capture) -> None:
+        mock_cli_api_client.assets.list.return_value.assets = [
+            SimpleNamespace(id=7, name="asset1_producer", 
uri="s3://bucket/asset1_producer"),
+            SimpleNamespace(id=8, name="other", uri="s3://bucket/other"),
+        ]
+        
mock_cli_api_client.assets.materialize.return_value.model_dump.return_value = {
+            "dag_id": "asset1_producer",
+            "run_type": "asset_materialization",
+            "state": "queued",
+        }
+        args = parser.parse_args(["assets", "materialize", 
"--name=asset1_producer", "--output=json"])
+        with stdout_capture as capture:
+            asset_command.asset_materialize(args)
+
+        run_list = json.loads(capture.getvalue())
+        assert len(run_list) == 1
+        assert run_list[0]["dag_id"] == "asset1_producer"
+        # The asset is resolved to its id and materialization is delegated to 
the API server.
+        
mock_cli_api_client.assets.materialize.assert_called_once_with(asset_id="7")
+
+    def test_materialize_requires_name_or_uri(self, parser: ArgumentParser, 
mock_cli_api_client) -> None:
+        with pytest.raises(SystemExit, match="Either --name or --uri is 
required"):
+            asset_command.asset_materialize(parser.parse_args(["assets", 
"materialize"]))
+        mock_cli_api_client.assets.materialize.assert_not_called()
+
+    def test_materialize_missing(self, parser: ArgumentParser, 
mock_cli_api_client) -> None:
+        mock_cli_api_client.assets.list.return_value.assets = []
+        with pytest.raises(SystemExit, match="Asset with name nope does not 
exist"):
+            asset_command.asset_materialize(parser.parse_args(["assets", 
"materialize", "--name=nope"]))
+        mock_cli_api_client.assets.materialize.assert_not_called()
+
+    def test_materialize_ambiguous(self, parser: ArgumentParser, 
mock_cli_api_client) -> None:
+        mock_cli_api_client.assets.list.return_value.assets = [
+            SimpleNamespace(id=1, name="dup", uri="s3://a"),
+            SimpleNamespace(id=2, name="dup", uri="s3://b"),
+        ]
+        with pytest.raises(SystemExit, match="More than one asset exists with 
name dup"):
+            asset_command.asset_materialize(parser.parse_args(["assets", 
"materialize", "--name=dup"]))
+        mock_cli_api_client.assets.materialize.assert_not_called()
diff --git a/airflow-core/tests/unit/cli/commands/test_command_deprecations.py 
b/airflow-core/tests/unit/cli/commands/test_command_deprecations.py
new file mode 100644
index 00000000000..b4eb6840c90
--- /dev/null
+++ b/airflow-core/tests/unit/cli/commands/test_command_deprecations.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Single source of truth for the ``airflow`` CLI commands deprecated in favour 
of ``airflowctl``.
+
+Every command decorated with ``deprecated_for_airflowctl`` must have one entry 
below. When a new
+command is migrated and deprecated, add a row to ``DEPRECATED_CLI_COMMANDS`` 
-- the test then
+verifies it emits ``RemovedInAirflow4Warning`` pointing at the right 
``airflowctl`` command.
+"""
+
+from __future__ import annotations
+
+import contextlib
+import re
+
+import pytest
+
+from airflow.cli.commands import asset_command, dag_command, pool_command
+from airflow.exceptions import RemovedInAirflow4Warning
+
+# (command callable, argv to parse, expected airflowctl replacement named in 
the warning)
+DEPRECATED_CLI_COMMANDS = [
+    (dag_command.dag_trigger, ["dags", "trigger", "example_dag", 
"--run-id=x"], "airflowctl dags trigger"),
+    (dag_command.dag_delete, ["dags", "delete", "example_dag", "--yes"], 
"airflowctl dags delete"),
+    (pool_command.pool_list, ["pools", "list"], "airflowctl pools list"),
+    (pool_command.pool_get, ["pools", "get", "foo"], "airflowctl pools get"),
+    (pool_command.pool_set, ["pools", "set", "foo", "1", "desc"], "airflowctl 
pools create"),
+    (pool_command.pool_delete, ["pools", "delete", "foo"], "airflowctl pools 
delete"),
+    (pool_command.pool_import, ["pools", "import", "/nonexistent.json"], 
"airflowctl pools import"),
+    (
+        pool_command.pool_export,
+        ["pools", "export", "/tmp/airflow_pools_export.json"],
+        "airflowctl pools export",
+    ),
+    (
+        asset_command.asset_materialize,
+        ["assets", "materialize", "--name=foo"],
+        "airflowctl assets materialize",
+    ),
+]
+
+
[email protected](
+    ("command", "argv", "replacement"),
+    DEPRECATED_CLI_COMMANDS,
+    ids=[argv[0] + "-" + argv[1] for _, argv, _ in DEPRECATED_CLI_COMMANDS],
+)
+def test_deprecated_cli_command_points_to_airflowctl(command, argv, 
replacement, parser, mock_cli_api_client):
+    """Each migrated command warns it will become an alias for its 
``airflowctl`` counterpart.
+
+    We only assert the deprecation warning fires (and names the right 
replacement); the command
+    body itself is exercised by the per-command test modules, so any error it 
raises against the
+    bare mocked client is irrelevant here and suppressed.
+    """
+    with pytest.warns(RemovedInAirflow4Warning, match=re.escape(replacement)):
+        with contextlib.suppress(Exception, SystemExit):
+            command(parser.parse_args(argv))
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py 
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 6f76034a421..2dafed8f15e 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -25,13 +25,14 @@ from datetime import datetime, timedelta
 from unittest import mock
 from unittest.mock import MagicMock
 
+import httpx
 import msgspec
 import pendulum
 import pytest
 import time_machine
-from sqlalchemy import func, select
+from airflowctl.api.operations import ServerResponseError
+from sqlalchemy import select
 
-from airflow import settings
 from airflow._shared.timezones import timezone
 from airflow.cli import cli_parser
 from airflow.cli.commands import dag_command
@@ -485,21 +486,19 @@ class TestCliDags:
         assert str(path_to_parse) in log_output
         assert "[0 100 * * *] is not acceptable, out of range" in log_output
 
-    def test_cli_list_dag_runs(self):
-        dag_command.dag_trigger(
-            self.parser.parse_args(
-                [
-                    "dags",
-                    "trigger",
-                    "example_bash_operator",
-                ]
-            )
-        )
+    def test_cli_list_dag_runs(self, dag_maker):
+        # Seed a run directly in the DB; ``dags trigger`` now goes through the 
API server
+        # (airflowctl client) and cannot be used as an in-process fixture here.
+        with dag_maker("test_list_dag_runs", start_date=DEFAULT_DATE, 
serialized=True):
+            EmptyOperator(task_id="t1")
+        dag_maker.create_dagrun(state=DagRunState.SUCCESS, 
logical_date=DEFAULT_DATE)
+        dag_maker.sync_dagbag_to_db()
+
         args = self.parser.parse_args(
             [
                 "dags",
                 "list-runs",
-                "example_bash_operator",
+                "test_list_dag_runs",
                 "--no-backfill",
                 "--start-date",
                 DEFAULT_DATE.isoformat(),
@@ -592,206 +591,6 @@ class TestCliDags:
         out = temp_stdout.splitlines()[-1]
         assert out == "No unpaused DAGs were found"
 
-    def test_trigger_dag(self):
-        dag_command.dag_trigger(
-            self.parser.parse_args(
-                [
-                    "dags",
-                    "trigger",
-                    "example_bash_operator",
-                    "--run-id=test_trigger_dag",
-                    '--conf={"foo": "bar"}',
-                ],
-            ),
-        )
-        with create_session() as session:
-            dagrun = session.scalars(select(DagRun).where(DagRun.run_id == 
"test_trigger_dag")).one()
-
-        assert dagrun, "DagRun not created"
-        assert dagrun.run_type == DagRunType.MANUAL
-        assert dagrun.conf == {"foo": "bar"}
-
-        # logical_date is None as it's not provided
-        assert dagrun.logical_date is None
-
-        # data_interval is None as logical_date is None
-        assert dagrun.data_interval_start is None
-        assert dagrun.data_interval_end is None
-
-    def test_trigger_dag_empty_object_conf(self):
-        dag_command.dag_trigger(
-            self.parser.parse_args(
-                [
-                    "dags",
-                    "trigger",
-                    "example_bash_operator",
-                    "--run-id=test_trigger_dag_empty_object_conf",
-                    "--conf={}",
-                ],
-            ),
-        )
-        with create_session() as session:
-            dagrun = session.scalars(
-                select(DagRun).where(DagRun.run_id == 
"test_trigger_dag_empty_object_conf")
-            ).one()
-
-        assert dagrun.conf == {}
-
-    def test_trigger_dag_json_null_conf(self):
-        dag_command.dag_trigger(
-            self.parser.parse_args(
-                [
-                    "dags",
-                    "trigger",
-                    "example_bash_operator",
-                    "--run-id=test_trigger_dag_json_null_conf",
-                    "--conf=null",
-                ],
-            ),
-        )
-        with create_session() as session:
-            dagrun = session.scalars(
-                select(DagRun).where(DagRun.run_id == 
"test_trigger_dag_json_null_conf")
-            ).one()
-
-        assert dagrun.conf == {}
-
-    def test_trigger_dag_with_microseconds(self):
-        dag_command.dag_trigger(
-            self.parser.parse_args(
-                [
-                    "dags",
-                    "trigger",
-                    "example_bash_operator",
-                    "--run-id=test_trigger_dag_with_micro",
-                    "--logical-date=2021-06-04T09:00:00.000001+08:00",
-                    "--no-replace-microseconds",
-                ],
-            )
-        )
-
-        with create_session() as session:
-            dagrun = session.scalars(
-                select(DagRun).where(DagRun.run_id == 
"test_trigger_dag_with_micro")
-            ).one()
-
-        assert dagrun, "DagRun not created"
-        assert dagrun.run_type == DagRunType.MANUAL
-        assert dagrun.logical_date.isoformat(timespec="microseconds") == 
"2021-06-04T01:00:00.000001+00:00"
-
-    @pytest.mark.parametrize("conf", ["NOT JSON", ""])
-    def test_trigger_dag_invalid_conf(self, conf):
-        with pytest.raises(ValueError, match=r"Expecting value: line \d+ 
column \d+ \(char \d+\)"):
-            dag_command.dag_trigger(
-                self.parser.parse_args(
-                    [
-                        "dags",
-                        "trigger",
-                        "example_bash_operator",
-                        "--run-id",
-                        "trigger_dag_xxx",
-                        "--conf",
-                        conf,
-                    ]
-                ),
-            )
-
-    @pytest.mark.parametrize("conf", ["[]", '"str"', "1", "false"])
-    def test_trigger_dag_rejects_non_object_conf(self, conf):
-        with pytest.raises(ValueError, match="DagRun conf must be a JSON 
object or null"):
-            dag_command.dag_trigger(
-                self.parser.parse_args(
-                    [
-                        "dags",
-                        "trigger",
-                        "example_bash_operator",
-                        "--run-id",
-                        "trigger_dag_xxx",
-                        "--conf",
-                        conf,
-                    ]
-                ),
-            )
-
-    def test_trigger_dag_output_as_json(self, stdout_capture):
-        args = self.parser.parse_args(
-            [
-                "dags",
-                "trigger",
-                "example_bash_operator",
-                "--run-id",
-                "trigger_dag_xxx",
-                "--conf",
-                '{"conf1": "val1", "conf2": "val2"}',
-                "--output=json",
-            ]
-        )
-        with stdout_capture as temp_stdout:
-            dag_command.dag_trigger(args)
-            # get the last line from the logs ignoring all logging lines
-            out = temp_stdout.getvalue().strip().splitlines()[-1]
-        parsed_out = json.loads(out)
-
-        assert len(parsed_out) == 1
-        assert parsed_out[0]["dag_id"] == "example_bash_operator"
-        assert parsed_out[0]["dag_run_id"] == "trigger_dag_xxx"
-        assert parsed_out[0]["conf"] == {"conf1": "val1", "conf2": "val2"}
-
-    def test_delete_dag(self):
-        DM = DagModel
-        key = "my_dag_id"
-        session = settings.Session()
-        session.add(DM(dag_id=key, bundle_name="dags-folder"))
-        session.commit()
-        dag_command.dag_delete(self.parser.parse_args(["dags", "delete", key, 
"--yes"]))
-        assert 
session.scalar(select(func.count()).select_from(DM).where(DM.dag_id == key)) == 0
-        with pytest.raises(AirflowException):
-            dag_command.dag_delete(
-                self.parser.parse_args(["dags", "delete", 
"does_not_exist_dag", "--yes"]),
-            )
-
-    def test_dag_delete_when_backfill_and_dagrun_exist(self):
-        # Test to check that the DAG should be deleted even if
-        # there are backfill records associated with it.
-        from airflow.models.backfill import Backfill
-
-        DM = DagModel
-        key = "my_dag_id"
-        session = settings.Session()
-        session.add(DM(dag_id=key, bundle_name="dags-folder"))
-        _backfill = Backfill(dag_id=key, from_date=DEFAULT_DATE, 
to_date=DEFAULT_DATE + timedelta(days=1))
-        session.add(_backfill)
-        # To create the backfill_id in DagRun
-        session.flush()
-        session.add(
-            DagRun(
-                dag_id=key,
-                run_id="backfill__" + key,
-                state=DagRunState.SUCCESS,
-                run_type="backfill",
-                backfill_id=_backfill.id,
-            )
-        )
-        session.commit()
-        dag_command.dag_delete(self.parser.parse_args(["dags", "delete", key, 
"--yes"]))
-        assert 
session.scalar(select(func.count()).select_from(DM).where(DM.dag_id == key)) == 0
-        with pytest.raises(AirflowException):
-            dag_command.dag_delete(
-                self.parser.parse_args(["dags", "delete", 
"does_not_exist_dag", "--yes"]),
-            )
-
-    def test_delete_dag_existing_file(self, tmp_path):
-        # Test to check that the DAG should be deleted even if
-        # the file containing it is not deleted
-        path = tmp_path / "testfile"
-        DM = DagModel
-        key = "my_dag_id"
-        session = settings.Session()
-        session.add(DM(dag_id=key, bundle_name="dags-folder", 
fileloc=os.fspath(path)))
-        session.commit()
-        dag_command.dag_delete(self.parser.parse_args(["dags", "delete", key, 
"--yes"]))
-        assert 
session.scalar(select(func.count()).select_from(DM).where(DM.dag_id == key)) == 0
-
     def test_cli_list_jobs(self):
         args = self.parser.parse_args(["dags", "list-jobs"])
         dag_command.dag_list_jobs(args)
@@ -1991,3 +1790,142 @@ class TestDagDetailsIsBackfillable:
         )
         dag_details = dag_command._get_dagbag_dag_details(dag)
         assert dag_details["is_backfillable"] is expected
+
+
+def _server_error(status_code: int) -> ServerResponseError:
+    request = httpx.Request("DELETE", "http://testserver/api/v2/dags/foo";)
+    response = httpx.Response(status_code, request=request, json={"detail": 
"boom"})
+    return ServerResponseError(message="boom", request=request, 
response=response)
+
+
[email protected]_db_test_override
+class TestCliDagsApiClientCommands:
+    """Dag CLI commands that talk to the API server through the airflowctl 
client.
+
+    These are unit tests: the airflowctl client is mocked so no API server (or
+    database) is required.
+    """
+
+    @classmethod
+    def setup_class(cls):
+        cls.parser = cli_parser.get_parser()
+
+    @pytest.fixture(autouse=True)
+    def _default_trigger_response(self, mock_cli_api_client):
+        """Give the mocked ``dags.trigger`` a dict response so ``print_as`` 
can render it."""
+        mock_cli_api_client.dags.trigger.return_value.model_dump.return_value 
= {
+            "dag_id": "example_bash_operator",
+            "dag_run_id": "test_run",
+        }
+
+    def test_trigger_dag(self, mock_cli_api_client):
+        dag_command.dag_trigger(
+            self.parser.parse_args(
+                [
+                    "dags",
+                    "trigger",
+                    "example_bash_operator",
+                    "--run-id=test_trigger_dag",
+                    '--conf={"foo": "bar"}',
+                ]
+            ),
+        )
+        mock_cli_api_client.dags.trigger.assert_called_once()
+        call = mock_cli_api_client.dags.trigger.call_args
+        assert call.kwargs["dag_id"] == "example_bash_operator"
+        body = call.kwargs["trigger_dag_run"]
+        assert body.dag_run_id == "test_trigger_dag"
+        assert body.conf == {"foo": "bar"}
+        # logical_date is None as it's not provided
+        assert body.logical_date is None
+
+    def test_trigger_dag_empty_object_conf(self, mock_cli_api_client):
+        dag_command.dag_trigger(
+            self.parser.parse_args(
+                ["dags", "trigger", "example_bash_operator", 
"--run-id=empty_conf", "--conf={}"]
+            ),
+        )
+        body = 
mock_cli_api_client.dags.trigger.call_args.kwargs["trigger_dag_run"]
+        assert body.conf == {}
+
+    def test_trigger_dag_json_null_conf(self, mock_cli_api_client):
+        dag_command.dag_trigger(
+            self.parser.parse_args(
+                ["dags", "trigger", "example_bash_operator", 
"--run-id=null_conf", "--conf=null"]
+            ),
+        )
+        # ``null`` conf resolves to None on the client; the API server coerces 
it to {}.
+        body = 
mock_cli_api_client.dags.trigger.call_args.kwargs["trigger_dag_run"]
+        assert body.conf is None
+
+    def test_trigger_dag_with_microseconds(self, mock_cli_api_client):
+        dag_command.dag_trigger(
+            self.parser.parse_args(
+                [
+                    "dags",
+                    "trigger",
+                    "example_bash_operator",
+                    "--run-id=micro",
+                    "--logical-date=2021-06-04T09:00:00.000001+08:00",
+                ]
+            )
+        )
+        body = 
mock_cli_api_client.dags.trigger.call_args.kwargs["trigger_dag_run"]
+        assert body.logical_date.isoformat(timespec="microseconds") == 
"2021-06-04T09:00:00.000001+08:00"
+
+    @pytest.mark.parametrize("conf", ["NOT JSON", ""])
+    def test_trigger_dag_invalid_conf(self, mock_cli_api_client, conf):
+        with pytest.raises(ValueError, match=r"Expecting value: line \d+ 
column \d+ \(char \d+\)"):
+            dag_command.dag_trigger(
+                self.parser.parse_args(
+                    ["dags", "trigger", "example_bash_operator", "--run-id", 
"xxx", "--conf", conf]
+                ),
+            )
+        mock_cli_api_client.dags.trigger.assert_not_called()
+
+    @pytest.mark.parametrize("conf", ["[]", '"str"', "1", "false"])
+    def test_trigger_dag_rejects_non_object_conf(self, mock_cli_api_client, 
conf):
+        with pytest.raises(ValueError, match="DagRun conf must be a JSON 
object or null"):
+            dag_command.dag_trigger(
+                self.parser.parse_args(
+                    ["dags", "trigger", "example_bash_operator", "--run-id", 
"xxx", "--conf", conf]
+                ),
+            )
+        mock_cli_api_client.dags.trigger.assert_not_called()
+
+    def test_trigger_dag_output_as_json(self, mock_cli_api_client, 
stdout_capture):
+        mock_cli_api_client.dags.trigger.return_value.model_dump.return_value 
= {
+            "dag_id": "example_bash_operator",
+            "dag_run_id": "trigger_dag_xxx",
+            "conf": {"conf1": "val1", "conf2": "val2"},
+        }
+        args = self.parser.parse_args(
+            [
+                "dags",
+                "trigger",
+                "example_bash_operator",
+                "--run-id",
+                "trigger_dag_xxx",
+                "--conf",
+                '{"conf1": "val1", "conf2": "val2"}',
+                "--output=json",
+            ]
+        )
+        with stdout_capture as temp_stdout:
+            dag_command.dag_trigger(args)
+            out = temp_stdout.getvalue().strip().splitlines()[-1]
+        parsed_out = json.loads(out)
+
+        assert len(parsed_out) == 1
+        assert parsed_out[0]["dag_id"] == "example_bash_operator"
+        assert parsed_out[0]["dag_run_id"] == "trigger_dag_xxx"
+        assert parsed_out[0]["conf"] == {"conf1": "val1", "conf2": "val2"}
+
+    def test_delete_dag(self, mock_cli_api_client):
+        dag_command.dag_delete(self.parser.parse_args(["dags", "delete", 
"my_dag_id", "--yes"]))
+        
mock_cli_api_client.dags.delete.assert_called_once_with(dag_id="my_dag_id")
+
+    def test_delete_dag_missing(self, mock_cli_api_client):
+        mock_cli_api_client.dags.delete.side_effect = _server_error(404)
+        with pytest.raises(ServerResponseError):
+            dag_command.dag_delete(self.parser.parse_args(["dags", "delete", 
"does_not_exist_dag", "--yes"]))
diff --git a/airflow-core/tests/unit/cli/commands/test_pool_command.py 
b/airflow-core/tests/unit/cli/commands/test_pool_command.py
index ad6951567fb..28e2d761812 100644
--- a/airflow-core/tests/unit/cli/commands/test_pool_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_pool_command.py
@@ -18,281 +18,235 @@
 from __future__ import annotations
 
 import json
+from types import SimpleNamespace
 
+import httpx
 import pytest
-from sqlalchemy import delete, func, select
+from airflowctl.api.operations import ServerResponseError
 
-from airflow import models, settings
 from airflow.cli import cli_parser
 from airflow.cli.commands import pool_command
-from airflow.models import Pool
-from airflow.settings import Session
-from airflow.utils.db import add_default_pool_if_not_exists
 
-pytestmark = pytest.mark.db_test
+from tests_common.test_utils.config import conf_vars
+
+
+def _pool(name, slots, description="", include_deferred=False, team_name=None):
+    """Build a stand-in for the airflowctl ``PoolResponse`` returned by the 
client."""
+    return SimpleNamespace(
+        name=name,
+        slots=slots,
+        description=description,
+        include_deferred=include_deferred,
+        team_name=team_name,
+    )
+
+
+def _server_error(status_code: int) -> ServerResponseError:
+    request = httpx.Request("GET", "http://testserver/api/v2/pools/foo";)
+    response = httpx.Response(status_code, request=request, json={"detail": 
"boom"})
+    return ServerResponseError(message="boom", request=request, 
response=response)
 
 
 class TestCliPools:
     @classmethod
     def setup_class(cls):
-        cls.dagbag = models.DagBag()
         cls.parser = cli_parser.get_parser()
-        settings.configure_orm()
-        cls.session = Session
-        cls._cleanup()
-
-    def tearDown(self):
-        self._cleanup()
-
-    @staticmethod
-    def _cleanup(session=None):
-        if session is None:
-            session = Session()
-        session.execute(delete(Pool).where(Pool.pool != 
Pool.DEFAULT_POOL_NAME))
-        session.commit()
-        add_default_pool_if_not_exists()
-        session.close()
-
-    def test_pool_list(self, stdout_capture):
-        pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", 
"1", "test"]))
+
+    def test_pool_list(self, mock_cli_api_client, stdout_capture):
+        mock_cli_api_client.pools.list.return_value.pools = [_pool("foo", 1, 
"test")]
         with stdout_capture as stdout:
             pool_command.pool_list(self.parser.parse_args(["pools", "list"]))
 
         assert "foo" in stdout.getvalue()
+        mock_cli_api_client.pools.list.assert_called_once()
 
-    def test_pool_list_with_args(self):
+    def test_pool_list_with_args(self, mock_cli_api_client):
+        mock_cli_api_client.pools.list.return_value.pools = [_pool("foo", 1, 
"test")]
         pool_command.pool_list(self.parser.parse_args(["pools", "list", 
"--output", "json"]))
 
-    def test_pool_create(self):
+    def test_pool_create(self, mock_cli_api_client):
         pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", 
"1", "test"]))
-        assert self.session.scalar(select(func.count()).select_from(Pool)) == 2
 
-    def test_pool_update_deferred(self):
-        pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", 
"1", "test"]))
-        assert self.session.scalar(select(Pool).where(Pool.pool == 
"foo")).include_deferred is False
+        mock_cli_api_client.pools.create.assert_called_once()
+        body = mock_cli_api_client.pools.create.call_args.kwargs["pool"]
+        # core_api PoolBody exposes the name via the ``pool`` attribute (alias 
``name``).
+        assert body.pool == "foo"
+        assert body.slots == 1
+        assert body.description == "test"
+        assert body.include_deferred is False
 
+    def test_pool_create_include_deferred(self, mock_cli_api_client):
         pool_command.pool_set(
             self.parser.parse_args(["pools", "set", "foo", "1", "test", 
"--include-deferred"])
         )
-        assert self.session.scalar(select(Pool).where(Pool.pool == 
"foo")).include_deferred is True
 
-        pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", 
"1", "test"]))
-        assert self.session.scalar(select(Pool).where(Pool.pool == 
"foo")).include_deferred is False
+        body = mock_cli_api_client.pools.create.call_args.kwargs["pool"]
+        assert body.include_deferred is True
 
-    def test_pool_get(self):
-        pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", 
"1", "test"]))
-        pool_command.pool_get(self.parser.parse_args(["pools", "get", "foo"]))
+    def test_pool_get(self, mock_cli_api_client, stdout_capture):
+        mock_cli_api_client.pools.get.return_value = _pool("foo", 1, "test")
+        with stdout_capture as stdout:
+            pool_command.pool_get(self.parser.parse_args(["pools", "get", 
"foo"]))
 
-    def test_pool_delete(self):
-        pool_command.pool_set(self.parser.parse_args(["pools", "set", "foo", 
"1", "test"]))
+        assert "foo" in stdout.getvalue()
+        mock_cli_api_client.pools.get.assert_called_once_with(pool_name="foo")
+
+    def test_pool_get_missing(self, mock_cli_api_client):
+        mock_cli_api_client.pools.get.side_effect = _server_error(404)
+        with pytest.raises(SystemExit, match="Pool foo does not exist"):
+            pool_command.pool_get(self.parser.parse_args(["pools", "get", 
"foo"]))
+
+    def test_pool_get_other_error_reraised(self, mock_cli_api_client):
+        mock_cli_api_client.pools.get.side_effect = _server_error(500)
+        with pytest.raises(ServerResponseError):
+            pool_command.pool_get(self.parser.parse_args(["pools", "get", 
"foo"]))
+
+    def test_pool_delete(self, mock_cli_api_client):
         pool_command.pool_delete(self.parser.parse_args(["pools", "delete", 
"foo"]))
-        assert self.session.scalar(select(func.count()).select_from(Pool)) == 1
+        mock_cli_api_client.pools.delete.assert_called_once_with(pool="foo")
+
+    def test_pool_delete_missing(self, mock_cli_api_client):
+        mock_cli_api_client.pools.delete.side_effect = _server_error(404)
+        with pytest.raises(SystemExit, match="Pool foo does not exist"):
+            pool_command.pool_delete(self.parser.parse_args(["pools", 
"delete", "foo"]))
 
-    def test_pool_import_nonexistent(self):
+    def test_pool_import_nonexistent(self, mock_cli_api_client):
         with pytest.raises(SystemExit):
             pool_command.pool_import(self.parser.parse_args(["pools", 
"import", "nonexistent.json"]))
 
-    def test_pool_import_invalid_json(self, tmp_path):
+    def test_pool_import_invalid_json(self, mock_cli_api_client, tmp_path):
         invalid_pool_import_file_path = tmp_path / "pools_import_invalid.json"
-        with open(invalid_pool_import_file_path, mode="w") as file:
-            file.write("not valid json")
+        invalid_pool_import_file_path.write_text("not valid json")
 
         with pytest.raises(SystemExit):
             pool_command.pool_import(
                 self.parser.parse_args(["pools", "import", 
str(invalid_pool_import_file_path)])
             )
 
-    def test_pool_import_invalid_pools(self, tmp_path):
+    def test_pool_import_invalid_pools(self, mock_cli_api_client, tmp_path):
         invalid_pool_import_file_path = tmp_path / "pools_import_invalid.json"
+        # Missing ``slots`` makes the entry invalid.
         pool_config_input = {"foo": {"description": "foo_test", 
"include_deferred": False}}
-        with open(invalid_pool_import_file_path, mode="w") as file:
-            json.dump(pool_config_input, file)
+        invalid_pool_import_file_path.write_text(json.dumps(pool_config_input))
 
         with pytest.raises(SystemExit):
             pool_command.pool_import(
                 self.parser.parse_args(["pools", "import", 
str(invalid_pool_import_file_path)])
             )
 
-    def test_pool_import_backwards_compatibility(self, tmp_path):
+    def test_pool_import(self, mock_cli_api_client, tmp_path):
         pool_import_file_path = tmp_path / "pools_import.json"
         pool_config_input = {
-            # JSON before version 2.7.0 does not contain `include_deferred`
-            "foo": {"description": "foo_test", "slots": 1},
+            "foo": {"description": "foo_test", "slots": 1, "include_deferred": 
True},
+            # JSON before version 2.7.0 does not contain ``include_deferred``.
+            "bar": {"description": "bar_test", "slots": 2},
         }
-        with open(pool_import_file_path, mode="w") as file:
-            json.dump(pool_config_input, file)
+        pool_import_file_path.write_text(json.dumps(pool_config_input))
 
         pool_command.pool_import(self.parser.parse_args(["pools", "import", 
str(pool_import_file_path)]))
 
-        assert self.session.scalar(select(Pool).where(Pool.pool == 
"foo")).include_deferred is False
-
-    def test_pool_import_export(self, tmp_path):
-        pool_import_file_path = tmp_path / "pools_import.json"
-        pool_export_file_path = tmp_path / "pools_export.json"
-        pool_config_input = {
-            "foo": {"description": "foo_test", "slots": 1, "include_deferred": 
True},
-            "default_pool": {
-                "description": "Default pool",
-                "slots": 128,
-                "include_deferred": False,
-            },
-            "baz": {"description": "baz_test", "slots": 2, "include_deferred": 
False},
+        assert mock_cli_api_client.pools.create.call_count == 2
+        bodies = {
+            call.kwargs["pool"].pool: call.kwargs["pool"]
+            for call in mock_cli_api_client.pools.create.call_args_list
         }
-        with open(pool_import_file_path, mode="w") as file:
-            json.dump(pool_config_input, file)
+        assert bodies["foo"].include_deferred is True
+        # Missing ``include_deferred`` defaults to False (backwards 
compatibility).
+        assert bodies["bar"].include_deferred is False
 
-        # Import json
-        pool_command.pool_import(self.parser.parse_args(["pools", "import", 
str(pool_import_file_path)]))
+    def test_pool_export(self, mock_cli_api_client, tmp_path):
+        pool_export_file_path = tmp_path / "pools_export.json"
+        mock_cli_api_client.pools.list.return_value.pools = [
+            _pool("foo", 1, "foo_test", include_deferred=True),
+            _pool("baz", 2, "baz_test", include_deferred=False),
+        ]
 
-        # Export json
         pool_command.pool_export(self.parser.parse_args(["pools", "export", 
str(pool_export_file_path)]))
 
-        with open(pool_export_file_path) as file:
-            pool_config_output = json.load(file)
-            assert pool_config_input == pool_config_output, "Input and output 
pool files are not same"
-
-    def test_pool_set_with_team_name(self):
-        """Test that pool_set with --team-name assigns the pool to the team 
when multi_team is enabled."""
-        from airflow.models.team import Team
+        exported = json.loads(pool_export_file_path.read_text())
+        assert exported == {
+            "foo": {"slots": 1, "description": "foo_test", "include_deferred": 
True},
+            "baz": {"slots": 2, "description": "baz_test", "include_deferred": 
False},
+        }
 
-        from tests_common.test_utils.config import conf_vars
+    def test_pool_set_with_team_name(self, mock_cli_api_client):
+        """``--team-name`` is forwarded to the airflowctl client when 
multi_team is enabled."""
+        with conf_vars({("core", "multi_team"): "True"}):
+            pool_command.pool_set(
+                self.parser.parse_args(
+                    ["pools", "set", "team_pool", "5", "team pool", 
"--team-name", "test_team"]
+                )
+            )
 
-        # Create the team first
-        team = Team(name="test_team")
-        self.session.add(team)
-        self.session.commit()
+        body = mock_cli_api_client.pools.create.call_args.kwargs["pool"]
+        assert body.team_name == "test_team"
 
-        try:
-            with conf_vars({("core", "multi_team"): "True"}):
+    def test_pool_set_team_name_rejected_when_multi_team_disabled(self, 
mock_cli_api_client):
+        """``PoolBody`` rejects a team_name (client-side) when multi_team is 
disabled."""
+        with conf_vars({("core", "multi_team"): "False"}):
+            with pytest.raises(ValueError, match="team_name cannot be set when 
multi_team mode is disabled"):
                 pool_command.pool_set(
                     self.parser.parse_args(
                         ["pools", "set", "team_pool", "5", "team pool", 
"--team-name", "test_team"]
                     )
                 )
+        mock_cli_api_client.pools.create.assert_not_called()
 
-            pool = self.session.scalar(select(Pool).where(Pool.pool == 
"team_pool"))
-            assert pool is not None
-            assert pool.team_name == "test_team"
-            assert pool.slots == 5
-        finally:
-            self.session.execute(delete(Pool).where(Pool.pool == "team_pool"))
-            self.session.execute(delete(Team).where(Team.name == "test_team"))
-            self.session.commit()
-
-    def test_pool_set_team_name_rejected_when_multi_team_disabled(self):
-        """Test that pool_set with --team-name raises when multi_team is 
disabled."""
-        from airflow.models.team import Team
-
-        from tests_common.test_utils.config import conf_vars
-
-        team = Team(name="test_team")
-        self.session.add(team)
-        self.session.commit()
-
-        try:
-            with conf_vars({("core", "multi_team"): "False"}):
-                with pytest.raises(
-                    ValueError, match="team_name cannot be set when multi_team 
mode is disabled"
-                ):
-                    pool_command.pool_set(
-                        self.parser.parse_args(
-                            ["pools", "set", "team_pool", "5", "team pool", 
"--team-name", "test_team"]
-                        )
-                    )
-        finally:
-            self.session.execute(delete(Pool).where(Pool.pool == "team_pool"))
-            self.session.execute(delete(Team).where(Team.name == "test_team"))
-            self.session.commit()
-
-    def test_pool_set_without_team_name(self):
-        """Test that pool_set without --team-name leaves team_name as None."""
+    def test_pool_set_without_team_name(self, mock_cli_api_client):
+        """Without ``--team-name`` the forwarded body has ``team_name`` as 
None."""
         pool_command.pool_set(self.parser.parse_args(["pools", "set", 
"no_team_pool", "3", "no team"]))
 
-        pool = self.session.scalar(select(Pool).where(Pool.pool == 
"no_team_pool"))
-        assert pool is not None
-        assert pool.team_name is None
-
-    def test_pool_import_export_with_team_name(self, tmp_path):
-        """Test that import/export round-trips the team_name field."""
-        from airflow.models.team import Team
-
-        from tests_common.test_utils.config import conf_vars
-
-        team = Team(name="import_team")
-        self.session.add(team)
-        self.session.commit()
+        body = mock_cli_api_client.pools.create.call_args.kwargs["pool"]
+        assert body.team_name is None
 
+    def test_pool_import_forwards_team_name(self, mock_cli_api_client, 
tmp_path):
+        """Import forwards each pool's ``team_name`` (or None) to the 
airflowctl client."""
         pool_import_file_path = tmp_path / "pools_import_team.json"
-        pool_export_file_path = tmp_path / "pools_export_team.json"
-        pool_config_input = {
-            "team_pool_a": {
-                "slots": 10,
-                "description": "team pool",
-                "include_deferred": False,
-                "team_name": "import_team",
-            },
-            "global_pool": {
-                "slots": 5,
-                "description": "global pool",
-                "include_deferred": False,
-            },
-        }
-
-        with open(pool_import_file_path, mode="w") as file:
-            json.dump(pool_config_input, file)
-
-        try:
-            with conf_vars({("core", "multi_team"): "True"}):
-                pool_command.pool_import(
-                    self.parser.parse_args(["pools", "import", 
str(pool_import_file_path)])
-                )
-
-            # Verify team assignment
-            pool = self.session.scalar(select(Pool).where(Pool.pool == 
"team_pool_a"))
-            assert pool is not None
-            assert pool.team_name == "import_team"
-
-            global_pool = self.session.scalar(select(Pool).where(Pool.pool == 
"global_pool"))
-            assert global_pool is not None
-            assert global_pool.team_name is None
+        pool_import_file_path.write_text(
+            json.dumps(
+                {
+                    "team_pool_a": {
+                        "slots": 10,
+                        "description": "team pool",
+                        "include_deferred": False,
+                        "team_name": "import_team",
+                    },
+                    "global_pool": {"slots": 5, "description": "global pool", 
"include_deferred": False},
+                }
+            )
+        )
 
-            # Export and verify
-            pool_command.pool_export(self.parser.parse_args(["pools", 
"export", str(pool_export_file_path)]))
+        with conf_vars({("core", "multi_team"): "True"}):
+            pool_command.pool_import(self.parser.parse_args(["pools", 
"import", str(pool_import_file_path)]))
 
-            with open(pool_export_file_path) as file:
-                pool_config_output = json.load(file)
+        bodies = {
+            call.kwargs["pool"].pool: call.kwargs["pool"]
+            for call in mock_cli_api_client.pools.create.call_args_list
+        }
+        assert bodies["team_pool_a"].team_name == "import_team"
+        assert bodies["global_pool"].team_name is None
 
-            assert pool_config_output["team_pool_a"]["team_name"] == 
"import_team"
-            assert "team_name" not in pool_config_output["global_pool"]
-        finally:
-            
self.session.execute(delete(Pool).where(Pool.pool.in_(["team_pool_a", 
"global_pool"])))
-            self.session.execute(delete(Team).where(Team.name == 
"import_team"))
-            self.session.commit()
+    def test_pool_export_includes_team_name(self, mock_cli_api_client, 
tmp_path):
+        """Export writes ``team_name`` only for pools that have one."""
+        pool_export_file_path = tmp_path / "pools_export_team.json"
+        mock_cli_api_client.pools.list.return_value.pools = [
+            _pool("team_pool_a", 10, "team pool", team_name="import_team"),
+            _pool("global_pool", 5, "global pool"),
+        ]
 
-    def test_pool_list_shows_team_name(self, stdout_capture):
-        """Test that pool list output includes the team_name column."""
-        from airflow.models.team import Team
+        pool_command.pool_export(self.parser.parse_args(["pools", "export", 
str(pool_export_file_path)]))
 
-        from tests_common.test_utils.config import conf_vars
+        exported = json.loads(pool_export_file_path.read_text())
+        assert exported["team_pool_a"]["team_name"] == "import_team"
+        assert "team_name" not in exported["global_pool"]
 
-        team = Team(name="list_team")
-        self.session.add(team)
-        self.session.commit()
+    def test_pool_list_shows_team_name(self, mock_cli_api_client, 
stdout_capture):
+        """Pool list output includes the team_name column."""
+        mock_cli_api_client.pools.list.return_value.pools = [
+            _pool("list_pool", 5, "desc", team_name="list_team")
+        ]
 
-        try:
-            with conf_vars({("core", "multi_team"): "True"}):
-                pool_command.pool_set(
-                    self.parser.parse_args(
-                        ["pools", "set", "list_pool", "5", "desc", 
"--team-name", "list_team"]
-                    )
-                )
-
-            with stdout_capture as stdout:
-                pool_command.pool_list(self.parser.parse_args(["pools", 
"list"]))
+        with stdout_capture as stdout:
+            pool_command.pool_list(self.parser.parse_args(["pools", "list"]))
 
-            output = stdout.getvalue()
-            assert "list_team" in output
-        finally:
-            self.session.execute(delete(Pool).where(Pool.pool == "list_pool"))
-            self.session.execute(delete(Team).where(Team.name == "list_team"))
-            self.session.commit()
+        assert "list_team" in stdout.getvalue()
diff --git a/airflow-core/tests/unit/cli/conftest.py 
b/airflow-core/tests/unit/cli/conftest.py
index 7676a103b53..d9d2ae341eb 100644
--- a/airflow-core/tests/unit/cli/conftest.py
+++ b/airflow-core/tests/unit/cli/conftest.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import sys
+from unittest import mock
 
 import pytest
 
@@ -68,6 +69,25 @@ def parser():
 # log messages
 
 
[email protected]
+def mock_cli_api_client():
+    """Mock the CLI airflowctl client and neutralize ``action_cli``'s DB touch 
points.
+
+    CLI commands that go through the airflowctl client only need the mocked 
client; the
+    ``@action_cli`` audit logging and log-template sync would otherwise open a 
database
+    session. Patching them lets these command tests run without a database or 
API server.
+    """
+    client = mock.MagicMock()
+    with (
+        mock.patch("airflow.cli.api_client.get_cli_api_client", 
return_value=client),
+        mock.patch("airflow.utils.cli_action_loggers.on_pre_execution"),
+        mock.patch("airflow.utils.cli_action_loggers.on_post_execution"),
+        mock.patch("airflow.utils.db.synchronize_log_template"),
+        mock.patch("airflow.utils.db.check_and_run_migrations"),
+    ):
+        yield client
+
+
 @pytest.fixture
 def stdout_capture(request):
     """Fixture that captures stdout only."""
diff --git a/airflow-core/tests/unit/cli/test_api_client.py 
b/airflow-core/tests/unit/cli/test_api_client.py
new file mode 100644
index 00000000000..3ef813cebda
--- /dev/null
+++ b/airflow-core/tests/unit/cli/test_api_client.py
@@ -0,0 +1,140 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
+from airflow.api_fastapi.auth.managers.simple.simple_auth_manager import 
SimpleAuthManager
+from airflow.cli import api_client as cli_api_client
+
+from tests_common.test_utils.config import conf_vars
+
+
[email protected](autouse=True)
+def _reset_singleton():
+    """Reset the process-wide client singleton around each test."""
+    cli_api_client._api_client = None
+    yield
+    cli_api_client._api_client = None
+
+
+class TestResolveBaseUrl:
+    @conf_vars({("api", "base_url"): "https://airflow.example.com:9999"})
+    def test_explicit_base_url(self):
+        assert cli_api_client._resolve_base_url() == 
"https://airflow.example.com:9999";
+
+    @conf_vars({("api", "base_url"): "", ("api", "host"): "myhost", ("api", 
"port"): "1234"})
+    def test_host_port_fallback(self):
+        assert cli_api_client._resolve_base_url() == "http://myhost:1234";
+
+
+class TestMintCliToken:
+    def test_uses_env_token(self, monkeypatch):
+        monkeypatch.setenv("AIRFLOW_CLI_TOKEN", "tok-123")
+        with mock.patch("airflow.api_fastapi.app.get_auth_manager") as 
get_auth_manager:
+            assert cli_api_client._mint_cli_token() == "tok-123"
+            # The auth manager is never consulted when a token is supplied 
explicitly.
+            get_auth_manager.assert_not_called()
+
+    def test_mints_via_auth_manager(self, monkeypatch):
+        monkeypatch.delenv("AIRFLOW_CLI_TOKEN", raising=False)
+        auth_manager = mock.MagicMock()
+        auth_manager.get_cli_user.return_value = "cli-user"
+        auth_manager.generate_jwt.return_value = "signed-jwt"
+        with mock.patch("airflow.api_fastapi.app.get_auth_manager", 
return_value=auth_manager):
+            assert cli_api_client._mint_cli_token() == "signed-jwt"
+
+        auth_manager.generate_jwt.assert_called_once()
+        assert auth_manager.generate_jwt.call_args.args[0] == "cli-user"
+        # Token must be short-lived.
+        assert 
auth_manager.generate_jwt.call_args.kwargs["expiration_time_in_seconds"] > 0
+
+    def test_initializes_auth_manager_when_not_initialized(self, monkeypatch):
+        # In the CLI the auth manager singleton is usually not initialized 
yet, so
+        # ``get_auth_manager`` raises and we must initialize it on demand.
+        monkeypatch.delenv("AIRFLOW_CLI_TOKEN", raising=False)
+        auth_manager = mock.MagicMock()
+        auth_manager.get_cli_user.return_value = "cli-user"
+        auth_manager.generate_jwt.return_value = "signed-jwt"
+        with (
+            mock.patch(
+                "airflow.api_fastapi.app.get_auth_manager",
+                side_effect=RuntimeError("Auth Manager has not been 
initialized yet."),
+            ),
+            mock.patch(
+                "airflow.api_fastapi.app.init_auth_manager", 
return_value=auth_manager
+            ) as init_auth_manager,
+        ):
+            assert cli_api_client._mint_cli_token() == "signed-jwt"
+
+        init_auth_manager.assert_called_once()
+        auth_manager.generate_jwt.assert_called_once()
+
+
+class TestGetCliApiClient:
+    def test_builds_singleton(self):
+        with (
+            mock.patch.object(cli_api_client, "_resolve_base_url", 
return_value="http://h:8080";),
+            mock.patch.object(cli_api_client, "_mint_cli_token", 
return_value="tok"),
+            mock.patch.object(cli_api_client, "Client") as client_cls,
+        ):
+            first = cli_api_client.get_cli_api_client()
+            second = cli_api_client.get_cli_api_client()
+
+        assert first is second
+        client_cls.assert_called_once()
+        kwargs = client_cls.call_args.kwargs
+        assert kwargs["base_url"] == "http://h:8080";
+        assert kwargs["token"] == "tok"
+        assert kwargs["kind"] == cli_api_client.ClientKind.CLI
+
+
+class TestProvideApiClient:
+    def test_injects_when_missing(self):
+        with mock.patch.object(cli_api_client, "get_cli_api_client", 
return_value="CLIENT"):
+
+            @cli_api_client.provide_api_client
+            def command(args, api_client=None):
+                return api_client
+
+            assert command("args") == "CLIENT"
+
+    def test_uses_explicit_client(self):
+        with mock.patch.object(cli_api_client, "get_cli_api_client") as 
get_client:
+
+            @cli_api_client.provide_api_client
+            def command(args, api_client=None):
+                return api_client
+
+            assert command("args", api_client="EXPLICIT") == "EXPLICIT"
+            get_client.assert_not_called()
+
+
+class TestGetCliUser:
+    def test_base_default_raises(self):
+        # The generic auth manager cannot know which user is authorized for 
the CLI.
+        with pytest.raises(NotImplementedError, match="AIRFLOW_CLI_TOKEN"):
+            BaseAuthManager.get_cli_user(mock.Mock())
+
+    def test_simple_auth_manager_returns_admin(self):
+        user = SimpleAuthManager.get_cli_user(mock.Mock())
+        assert user.get_id() == "cli"
+        assert user.role == "ADMIN"
diff --git a/airflow-core/tests/unit/cli/test_utils.py 
b/airflow-core/tests/unit/cli/test_utils.py
new file mode 100644
index 00000000000..4fb137ad482
--- /dev/null
+++ b/airflow-core/tests/unit/cli/test_utils.py
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.cli.utils import deprecated_for_airflowctl
+from airflow.exceptions import RemovedInAirflow4Warning
+
+
+class TestDeprecatedForAirflowctl:
+    def test_emits_warning_naming_replacement(self):
+        @deprecated_for_airflowctl("airflowctl dags trigger")
+        def command(args):
+            return "result"
+
+        with pytest.warns(RemovedInAirflow4Warning, match="airflowctl dags 
trigger"):
+            result = command(args=None)
+
+        # The wrapped command still runs and returns its value.
+        assert result == "result"
+
+    def test_passes_through_args_and_preserves_metadata(self):
+        @deprecated_for_airflowctl("airflowctl pools create")
+        def command(a, b, *, c):
+            """Original docstring."""
+            return (a, b, c)
+
+        with pytest.warns(RemovedInAirflow4Warning):
+            assert command(1, 2, c=3) == (1, 2, 3)
+
+        assert command.__name__ == "command"
+        assert command.__doc__ == "Original docstring."
diff --git 
a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py 
b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
index 20b3be74846..e02a1a4cf81 100644
--- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
+++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py
@@ -300,6 +300,30 @@ class FabAuthManager(BaseAuthManager[User]):
     def serialize_user(self, user: User) -> dict[str, Any]:
         return {"sub": str(user.id)}
 
+    def get_cli_user(self) -> User:
+        """
+        Return an existing ``Admin`` user for the local CLI to mint a token 
for.
+
+        The Airflow CLI mints a short-lived, in-memory JWT for this user so it 
can talk to
+        the API server. FAB tokens reference a real database user, so we reuse 
an existing
+        ``Admin`` user rather than fabricating one. If none exists, the 
operator must
+        create one or provide a token via the ``AIRFLOW_CLI_TOKEN`` 
environment variable.
+        """
+        from airflow.utils.session import create_session
+
+        with create_session() as session:
+            user = 
session.scalars(select(User).join(User.roles).where(Role.name == 
"Admin").limit(1)).first()
+            if user is None:
+                raise AirflowConfigException(
+                    "No user with the 'Admin' role exists in the FAB database. 
Create one "
+                    "(e.g. `airflow fab create-user --role Admin ...`) or set 
the "
+                    "AIRFLOW_CLI_TOKEN environment variable with a valid API 
token."
+                )
+            # Detach so attributes stay accessible after the session closes 
(and is not
+            # expired on commit) while the CLI serializes the user to mint the 
token.
+            session.expunge(user)
+        return user
+
     def is_logged_in(self) -> bool:
         """Return whether the user is logged in."""
         user = self.get_user()
diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py 
b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
index ab3cb8ee264..ba647b6dd86 100644
--- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
+++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py
@@ -1298,3 +1298,29 @@ class TestFabAuthManagerSessionCleanupErrorHandling:
             mock_session.remove.assert_called()
             mock_log.warning.assert_called()
             assert response is not None
+
+
+class TestFabGetCliUser:
+    """``get_cli_user`` reuses an existing ``Admin`` user for the local CLI 
token."""
+
+    @mock.patch("airflow.utils.session.create_session")
+    def test_returns_admin_user(self, mock_create_session, auth_manager):
+        admin_user = MagicMock()
+        session = MagicMock()
+        session.scalars.return_value.first.return_value = admin_user
+        mock_create_session.return_value.__enter__.return_value = session
+
+        result = auth_manager.get_cli_user()
+
+        assert result is admin_user
+        # The user is detached so its attributes survive the session closing.
+        session.expunge.assert_called_once_with(admin_user)
+
+    @mock.patch("airflow.utils.session.create_session")
+    def test_raises_when_no_admin_user(self, mock_create_session, 
auth_manager):
+        session = MagicMock()
+        session.scalars.return_value.first.return_value = None
+        mock_create_session.return_value.__enter__.return_value = session
+
+        with pytest.raises(AirflowConfigException, match="Admin"):
+            auth_manager.get_cli_user()
diff --git 
a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py
 
b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py
index a8cd683ef46..bc9ebc305c3 100644
--- 
a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py
+++ 
b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py
@@ -34,7 +34,7 @@ from urllib3.util import Retry
 
 from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX
 from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowConfigException, 
AirflowProviderDeprecationWarning
 
 try:
     from airflow.api_fastapi.auth.managers.base_auth_manager import 
ExtendedResourceMethod
@@ -141,6 +141,34 @@ class 
KeycloakAuthManager(BaseAuthManager[KeycloakAuthManagerUser]):
             "refresh_token": user.refresh_token,
         }
 
+    def get_cli_user(self) -> KeycloakAuthManagerUser:
+        """
+        Return a service-account user for the local CLI to mint a token for.
+
+        Keycloak tokens are issued by the external Keycloak server, so they 
cannot be
+        forged locally. The Keycloak client is already configured for Airflow 
to talk to
+        Keycloak, so we reuse it to obtain a service-account token through the
+        ``client_credentials`` flow. The service account's effective 
permissions are
+        governed by the Keycloak deployment. If the client credentials are not 
usable, the
+        operator must provide a token via the ``AIRFLOW_CLI_TOKEN`` 
environment variable.
+        """
+        try:
+            tokens = 
self.get_keycloak_client().token(grant_type="client_credentials")
+        except Exception as e:
+            raise AirflowConfigException(
+                "Could not obtain a Keycloak service-account token for the CLI 
via the "
+                "client_credentials flow. Set the AIRFLOW_CLI_TOKEN 
environment variable "
+                f"with a valid API token instead. Original error: {e}"
+            ) from e
+        return KeycloakAuthManagerUser(
+            user_id="airflow-cli",
+            name="airflow-cli",
+            access_token=tokens["access_token"],
+            # No refresh token is issued for the client_credentials flow (RFC 
6749 §4.4.3),
+            # which marks this as a service account in 
refresh_user/refresh_tokens.
+            refresh_token=tokens.get("refresh_token"),
+        )
+
     def get_url_login(self, **kwargs) -> str:
         base_url = conf.get("api", "base_url", fallback="/")
         return urljoin(base_url, f"{AUTH_MANAGER_FASTAPI_APP_PREFIX}/login")
diff --git 
a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py
 
b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py
index 9c8ed9dd5b6..e4b6a5c294d 100644
--- 
a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py
+++ 
b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py
@@ -45,7 +45,7 @@ if AIRFLOW_V_3_2_PLUS:
 else:
     TeamDetails = None  # type: ignore[assignment,misc]
 from airflow.api_fastapi.common.types import MenuItem
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from airflow.exceptions import AirflowConfigException, 
AirflowProviderDeprecationWarning
 
 try:
     from airflow.providers.common.compat.sdk import AirflowException
@@ -121,6 +121,25 @@ def _clear_filter_cache():
 
 
 class TestKeycloakAuthManager:
+    @patch.object(KeycloakAuthManager, "get_keycloak_client")
+    def test_get_cli_user(self, mock_get_keycloak_client, auth_manager):
+        # client_credentials (service account) flow returns an access token 
and no refresh token.
+        mock_get_keycloak_client.return_value.token.return_value = 
{"access_token": "svc-token"}
+
+        user = auth_manager.get_cli_user()
+
+        assert user.get_id() == "airflow-cli"
+        assert user.access_token == "svc-token"
+        assert user.refresh_token is None
+        
mock_get_keycloak_client.return_value.token.assert_called_once_with(grant_type="client_credentials")
+
+    @patch.object(KeycloakAuthManager, "get_keycloak_client")
+    def test_get_cli_user_raises_when_credentials_unusable(self, 
mock_get_keycloak_client, auth_manager):
+        mock_get_keycloak_client.return_value.token.side_effect = 
Exception("boom")
+
+        with pytest.raises(AirflowConfigException, match="AIRFLOW_CLI_TOKEN"):
+            auth_manager.get_cli_user()
+
     def test_deserialize_user(self, auth_manager):
         result = auth_manager.deserialize_user(
             {
diff --git a/scripts/ci/prek/known_airflow_exceptions.txt 
b/scripts/ci/prek/known_airflow_exceptions.txt
index 93aa6557c83..04c6e9534f0 100644
--- a/scripts/ci/prek/known_airflow_exceptions.txt
+++ b/scripts/ci/prek/known_airflow_exceptions.txt
@@ -1,7 +1,7 @@
 airflow-core/src/airflow/api/common/delete_dag.py::1
 airflow-core/src/airflow/api_fastapi/core_api/app.py::1
 airflow-core/src/airflow/cli/cli_parser.py::1
-airflow-core/src/airflow/cli/commands/dag_command.py::3
+airflow-core/src/airflow/cli/commands/dag_command.py::1
 airflow-core/src/airflow/cli/commands/db_command.py::1
 airflow-core/src/airflow/config_templates/airflow_local_settings.py::1
 airflow-core/src/airflow/dag_processing/dagbag.py::1

Reply via email to