This is an automated email from the ASF dual-hosted git repository. bugraoz pushed a commit to branch feat/47972/move-airflow-ctl-out-of-core in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 31a826434ed4474af1be7587bf86b11983560859 Author: Bugra Ozturk <[email protected]> AuthorDate: Thu Mar 20 03:21:49 2025 +0100 Create basic files and include basics, move api and related code to airflowctl --- airflowctl/README.md | 18 ++ airflowctl/pyproject.toml | 118 +++++++++++++ .../cli/api => airflowctl/src/airflow}/__init__.py | 1 + .../src/airflow/airflow}/api/client.py | 2 +- .../src/airflow/airflow}/api/operations.py | 4 +- .../cli => airflowctl/src/airflow}/api/__init__.py | 0 .../src/airflow}/api/datamodels/__init__.py | 0 .../src/airflow/api/datamodels/generated.py | 0 .../api => airflowctl/src/airflow/ctl}/__init__.py | 6 + airflowctl/src/airflow/ctl/cli_config.py | 195 +++++++++++++++++++++ airflowctl/src/airflow/ctl/cli_parser.py | 164 +++++++++++++++++ .../src/airflow/ctl/commands}/__init__.py | 0 .../airflow/ctl/commands/commands}/auth_command.py | 2 +- airflowctl/src/airflow/ctl/utils.py | 98 +++++++++++ {airflow/cli/api => airflowctl/tests}/__init__.py | 0 {airflow/cli => airflowctl/tests}/api/__init__.py | 0 {tests/cli => airflowctl/tests}/api/test_client.py | 4 +- .../tests}/api/test_operations.py | 4 +- .../api => airflowctl/tests/commands}/__init__.py | 0 .../tests/commands}/test_auth_command.py | 0 {tests/cli => airflowctl/tests}/conftest.py | 41 +---- hatch_build.py | 2 - pyproject.toml | 40 +---- .../api/datamodels/generated.py} | 0 tests/cli/conftest.py | 34 ---- 25 files changed, 618 insertions(+), 115 deletions(-) diff --git a/airflowctl/README.md b/airflowctl/README.md new file mode 100644 index 00000000000..ef14affc68c --- /dev/null +++ b/airflowctl/README.md @@ -0,0 +1,18 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + --> diff --git a/airflowctl/pyproject.toml b/airflowctl/pyproject.toml new file mode 100644 index 00000000000..a37a7f399bc --- /dev/null +++ b/airflowctl/pyproject.toml @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[project] +name = "apache-airflow-ctl" +dynamic = ["version"] +description = "Apache Airflow control interface for API." +readme = { file = "README.md", content-type = "text/markdown" } +requires-python = ">=3.9, <3.13" +dependencies = [ + # TODO there could be still missing deps such as airflow-core + "platformdirs>=4.3.6", + "keyring>=25.6.0", + "httpx>=0.25.0", + "uuid6>=2024.7.10", +] + +classifiers = [ + "Framework :: Apache Airflow", +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.version] +path = "src/airflow/airflowctl/__init__.py" + +[tool.hatch.build.targets.wheel] +packages = ["src/airflow"] +# This file only exists to make pyright/VSCode happy, don't ship it +exclude = ["src/airflow/__init__.py"] + +[tool.ruff] +extend = "../pyproject.toml" +src = ["src"] +namespace-packages = ["src/airflow"] + +[tool.ruff.lint.per-file-ignores] + +# Ignore Doc rules et al for anything outside of tests +"!src/*" = ["D", "TID253", "S101", "TRY002"] + +# Ignore the pytest rules outside the tests folder - https://github.com/astral-sh/ruff/issues/14205 +"!tests/*" = ["PT"] + +# Pycharm barfs if this "stub" file has future imports +"src/airflow/__init__.py" = ["I002"] + +# Generated file, be less strict +"airflow/api/datamodels/generated.py" = ["UP007", "D101", "D200"] + +[tool.coverage.run] +branch = true +relative_files = true +source = ["src/airflow"] +include_namespace_packages = true + +[tool.coverage.report] +skip_empty = true +exclude_also = [ + "def __repr__", + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", + "@(abc\\.)?abstractmethod", + "@(typing(_extensions)?\\.)?overload", + "if (typing(_extensions)?\\.)?TYPE_CHECKING:", +] + +[dependency-groups] +codegen = [ + "datamodel-code-generator[http]==0.28.2", +] +dev = [ + "apache-airflow", + "apache-airflow-devel-common", +] +[tool.uv.sources] + +# To use: +# +# TODO: Automate this in CI via pre-commit hook and generate the file each time +# The API should be running in the background to serve the OpenAPI schema +# uv run --group codegen --project apache-airflow --directory airflow/ datamodel-codegen +[tool.datamodel-codegen] +capitalise-enum-members=true # `State.RUNNING` not `State.running` +disable-timestamp=true +enable-version-header=true +enum-field-as-literal='one' # When a single enum member, make it output a `Literal["..."]` +input-file-type='openapi' +output-model-type='pydantic_v2.BaseModel' +output-datetime-class='datetime' +target-python-version='3.9' +use-annotated=true +use-default=true +use-double-quotes=true +use-schema-description=true # Desc becomes class doc comment +use-standard-collections=true # list[] not List[] +use-subclass-enum=true # enum, not union of Literals +use-union-operator=true # 3.9+annotations, not `Union[]` + +url = "http://0.0.0.0:8080/openapi.json" +output = "cli/api/datamodels/generated.py" diff --git a/airflow/cli/api/__init__.py b/airflowctl/src/airflow/__init__.py similarity index 99% copy from airflow/cli/api/__init__.py copy to airflowctl/src/airflow/__init__.py index 13a83393a91..217e5db9607 100644 --- a/airflow/cli/api/__init__.py +++ b/airflowctl/src/airflow/__init__.py @@ -1,3 +1,4 @@ +# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information diff --git a/airflow/cli/api/client.py b/airflowctl/src/airflow/airflow/api/client.py similarity index 99% rename from airflow/cli/api/client.py rename to airflowctl/src/airflow/airflow/api/client.py index e471bb2f60d..11536634564 100644 --- a/airflow/cli/api/client.py +++ b/airflowctl/src/airflow/airflow/api/client.py @@ -30,7 +30,7 @@ import structlog from platformdirs import user_config_path from uuid6 import uuid7 -from airflow.cli.api.operations import ( +from airflow.api.operations import ( AssetsOperations, BackfillsOperations, ConfigOperations, diff --git a/airflow/cli/api/operations.py b/airflowctl/src/airflow/airflow/api/operations.py similarity index 99% rename from airflow/cli/api/operations.py rename to airflowctl/src/airflow/airflow/api/operations.py index 3ee85cf19e0..f7cbce99100 100644 --- a/airflow/cli/api/operations.py +++ b/airflowctl/src/airflow/airflow/api/operations.py @@ -23,7 +23,7 @@ from typing import TYPE_CHECKING, Any import httpx import structlog -from airflow.cli.api.datamodels._generated import ( +from airflow.api.datamodels.generated import ( AssetAliasCollectionResponse, AssetAliasResponse, AssetCollectionResponse, @@ -57,7 +57,7 @@ from airflow.cli.api.datamodels._generated import ( ) if TYPE_CHECKING: - from airflow.cli.api.client import Client + from airflow.api.client import Client from airflow.utils.state import DagRunState log = structlog.get_logger(logger_name=__name__) diff --git a/airflow/cli/api/__init__.py b/airflowctl/src/airflow/api/__init__.py similarity index 100% copy from airflow/cli/api/__init__.py copy to airflowctl/src/airflow/api/__init__.py diff --git a/airflow/cli/api/datamodels/__init__.py b/airflowctl/src/airflow/api/datamodels/__init__.py similarity index 100% rename from airflow/cli/api/datamodels/__init__.py rename to airflowctl/src/airflow/api/datamodels/__init__.py diff --git a/airflow/cli/api/datamodels/_generated.py b/airflowctl/src/airflow/api/datamodels/generated.py similarity index 100% rename from airflow/cli/api/datamodels/_generated.py rename to airflowctl/src/airflow/api/datamodels/generated.py diff --git a/airflow/cli/api/__init__.py b/airflowctl/src/airflow/ctl/__init__.py similarity index 72% copy from airflow/cli/api/__init__.py copy to airflowctl/src/airflow/ctl/__init__.py index 13a83393a91..4af2905dfd9 100644 --- a/airflow/cli/api/__init__.py +++ b/airflowctl/src/airflow/ctl/__init__.py @@ -14,3 +14,9 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. + +# Pycharm needs to see this line. VSCode/pyright doesn't care about it, but this file needs to exist +# https://github.com/microsoft/pyright/issues/9439#issuecomment-2468990559 +from __future__ import annotations + +__path__ = __import__("pkgutil").extend_path(__path__, __name__) # type: ignore diff --git a/airflowctl/src/airflow/ctl/cli_config.py b/airflowctl/src/airflow/ctl/cli_config.py new file mode 100644 index 00000000000..a6248fdabf2 --- /dev/null +++ b/airflowctl/src/airflow/ctl/cli_config.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Explicit configuration and definition of Airflow CLI commands.""" + +from __future__ import annotations + +import argparse +import os +from collections.abc import Iterable +from typing import Callable, NamedTuple, Union + +from airflow.ctl.utils.module_loading import import_string + +BUILD_DOCS = "BUILDING_AIRFLOW_DOCS" in os.environ + + +def lazy_load_command(import_path: str) -> Callable: + """Create a lazy loader for command.""" + _, _, name = import_path.rpartition(".") + + def command(*args, **kwargs): + func = import_string(import_path) + return func(*args, **kwargs) + + command.__name__ = name + + return command + + +class DefaultHelpParser(argparse.ArgumentParser): + """CustomParser to display help message.""" + + def _check_value(self, action, value): + """Override _check_value and check conditionally added command.""" + super()._check_value(action, value) + + def error(self, message): + """Override error and use print_help instead of print_usage.""" + self.print_help() + self.exit(2, f"\n{self.prog} command error: {message}, see help above.\n") + + +# Used in Arg to enable `None' as a distinct value from "not passed" +_UNSET = object() + + +class Arg: + """Class to keep information about command line argument.""" + + def __init__( + self, + flags=_UNSET, + help=_UNSET, + action=_UNSET, + default=_UNSET, + nargs=_UNSET, + type=_UNSET, + choices=_UNSET, + required=_UNSET, + metavar=_UNSET, + dest=_UNSET, + ): + self.flags = flags + self.kwargs = {} + for k, v in locals().items(): + if k not in ("self", "flags") and v is not _UNSET: + self.kwargs[k] = v + + def add_to_parser(self, parser: argparse.ArgumentParser): + """Add this argument to an ArgumentParser.""" + if "metavar" in self.kwargs and "type" not in self.kwargs: + if self.kwargs["metavar"] == "DIRPATH": + + def type(x): + return self._is_valid_directory(parser, x) + + self.kwargs["type"] = type + parser.add_argument(*self.flags, **self.kwargs) + + def _is_valid_directory(self, parser, arg): + if not os.path.isdir(arg): + parser.error(f"The directory '{arg}' does not exist!") + return arg + + +def positive_int(*, allow_zero): + """Define a positive int type for an argument.""" + + def _check(value): + try: + value = int(value) + if allow_zero and value == 0: + return value + if value > 0: + return value + except ValueError: + pass + raise argparse.ArgumentTypeError(f"invalid positive int value: '{value}'") + + return _check + + +def string_list_type(val): + """Parse comma-separated list and returns list of string (strips whitespace).""" + return [x.strip() for x in val.split(",")] + + +def string_lower_type(val): + """Lower arg.""" + if not val: + return + return val.strip().lower() + + +# Authentication arguments +ARG_AUTH_URL = Arg( + flags=("--api-url",), + type=str, + default="http://localhost:8080", + dest="api_url", + help="The URL of the metadata database API", +) +ARG_AUTH_TOKEN = Arg( + flags=("--api-token",), + type=str, + dest="api_token", + help="The token to use for authentication", +) +ARG_AUTH_ENVIRONMENT = Arg( + flags=("-e", "--env"), + type=str, + default="production", + help="The environment to run the command in", +) + + +class ActionCommand(NamedTuple): + """Single CLI command.""" + + name: str + help: str + func: Callable + args: Iterable[Arg] + description: str | None = None + epilog: str | None = None + hide: bool = False + + +class GroupCommand(NamedTuple): + """ClI command with subcommands.""" + + name: str + help: str + subcommands: Iterable + description: str | None = None + epilog: str | None = None + + +CLICommand = Union[ActionCommand, GroupCommand] + +AUTH_COMMANDS = ( + ActionCommand( + name="login", + help="Login to the metadata database for personal usage. JWT Token must be provided via parameter.", + description="Login to the metadata database", + func=lazy_load_command("airflow.cli.commands.remote_commands.auth_command.login"), + args=(ARG_AUTH_URL, ARG_AUTH_TOKEN, ARG_AUTH_ENVIRONMENT), + ), +) + + +core_commands: list[CLICommand] = [ + GroupCommand( + name="auth", + help="Manage authentication for CLI. Please acquire a token from the api-server first. " + "You need to pass the token to subcommand to use `login`.", + subcommands=AUTH_COMMANDS, + ), +] diff --git a/airflowctl/src/airflow/ctl/cli_parser.py b/airflowctl/src/airflow/ctl/cli_parser.py new file mode 100644 index 00000000000..0064f758a7b --- /dev/null +++ b/airflowctl/src/airflow/ctl/cli_parser.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Produce a CLI parser object from Airflow CLI command configuration. + +.. seealso:: :mod:`airflow.cli.cli_config` +""" + +from __future__ import annotations + +import argparse +import logging +from argparse import Action +from collections import Counter +from collections.abc import Iterable +from functools import cache +from typing import TYPE_CHECKING + +import lazy_object_proxy +from rich_argparse import RawTextRichHelpFormatter, RichHelpFormatter + +from airflow.ctl.cli_config import ( + ActionCommand, + DefaultHelpParser, + GroupCommand, + core_commands, +) +from airflow.ctl.utils import CliConflictError +from airflow.exceptions import AirflowException +from airflow.utils.helpers import partition + +if TYPE_CHECKING: + from airflow.ctl.cli_config import ( + Arg, + CLICommand, + ) + +airflow_commands = core_commands.copy() # make a copy to prevent bad interactions in tests + +log = logging.getLogger(__name__) + + +ALL_COMMANDS_DICT: dict[str, CLICommand] = {sp.name: sp for sp in airflow_commands} + + +# Check if sub-commands are defined twice, which could be an issue. +if len(ALL_COMMANDS_DICT) < len(airflow_commands): + dup = {k for k, v in Counter([c.name for c in airflow_commands]).items() if v > 1} + raise CliConflictError( + f"The following CLI {len(dup)} command(s) are defined more than once: {sorted(dup)}\n" + f"This can be due to an Executor or Auth Manager redefining core airflow CLI commands." + ) + + +class AirflowHelpFormatter(RichHelpFormatter): + """ + Custom help formatter to display help message. + + It displays simple commands and groups of commands in separate sections. + """ + + def _iter_indented_subactions(self, action: Action): + if isinstance(action, argparse._SubParsersAction): + self._indent() + subactions = action._get_subactions() + action_subcommands, group_subcommands = partition( + lambda d: isinstance(ALL_COMMANDS_DICT[d.dest], GroupCommand), subactions + ) + yield Action([], f"\n{' ':{self._current_indent}}Groups", nargs=0) + self._indent() + yield from group_subcommands + self._dedent() + + yield Action([], f"\n{' ':{self._current_indent}}Commands:", nargs=0) + self._indent() + yield from action_subcommands + self._dedent() + self._dedent() + else: + yield from super()._iter_indented_subactions(action) + + +class LazyRichHelpFormatter(RawTextRichHelpFormatter): + """ + Custom help formatter to display help message. + + It resolves lazy help string before printing it using rich. + """ + + def add_argument(self, action: Action) -> None: + if isinstance(action.help, lazy_object_proxy.Proxy): + action.help = str(action.help) + return super().add_argument(action) + + +@cache +def get_parser(dag_parser: bool = False) -> argparse.ArgumentParser: + """Create and returns command line argument parser.""" + parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter) + subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND") + subparsers.required = True + + for _, sub in sorted(ALL_COMMANDS_DICT.items()): + _add_command(subparsers, sub) + return parser + + +def _sort_args(args: Iterable[Arg]) -> Iterable[Arg]: + """Sort subcommand optional args, keep positional args.""" + + def get_long_option(arg: Arg): + """Get long option from Arg.flags.""" + return arg.flags[0] if len(arg.flags) == 1 else arg.flags[1] + + positional, optional = partition(lambda x: x.flags[0].startswith("-"), args) + yield from positional + yield from sorted(optional, key=lambda x: get_long_option(x).lower()) + + +def _add_command(subparsers: argparse._SubParsersAction, sub: CLICommand) -> None: + if isinstance(sub, ActionCommand) and sub.hide: + sub_proc = subparsers.add_parser(sub.name, epilog=sub.epilog) + else: + sub_proc = subparsers.add_parser( + sub.name, help=sub.help, description=sub.description or sub.help, epilog=sub.epilog + ) + sub_proc.formatter_class = LazyRichHelpFormatter + + if isinstance(sub, GroupCommand): + _add_group_command(sub, sub_proc) + elif isinstance(sub, ActionCommand): + _add_action_command(sub, sub_proc) + else: + raise AirflowException("Invalid command definition.") + + +def _add_action_command(sub: ActionCommand, sub_proc: argparse.ArgumentParser) -> None: + for arg in _sort_args(sub.args): + arg.add_to_parser(sub_proc) + sub_proc.set_defaults(func=sub.func) + + +def _add_group_command(sub: GroupCommand, sub_proc: argparse.ArgumentParser) -> None: + subcommands = sub.subcommands + sub_subparsers = sub_proc.add_subparsers(dest="subcommand", metavar="COMMAND") + sub_subparsers.required = True + for command in sorted(subcommands, key=lambda x: x.name): + _add_command(sub_subparsers, command) diff --git a/tests/cli/api/__init__.py b/airflowctl/src/airflow/ctl/commands/__init__.py similarity index 100% rename from tests/cli/api/__init__.py rename to airflowctl/src/airflow/ctl/commands/__init__.py diff --git a/airflow/cli/commands/remote_commands/auth_command.py b/airflowctl/src/airflow/ctl/commands/commands/auth_command.py similarity index 96% rename from airflow/cli/commands/remote_commands/auth_command.py rename to airflowctl/src/airflow/ctl/commands/commands/auth_command.py index 4bb78a530a4..e3ebdc961e2 100644 --- a/airflow/cli/commands/remote_commands/auth_command.py +++ b/airflowctl/src/airflow/ctl/commands/commands/auth_command.py @@ -23,7 +23,7 @@ import sys import rich -from airflow.cli.api.client import Credentials +from airflow.api.client import Credentials from airflow.utils import cli as cli_utils diff --git a/airflowctl/src/airflow/ctl/utils.py b/airflowctl/src/airflow/ctl/utils.py new file mode 100644 index 00000000000..605244ee71e --- /dev/null +++ b/airflowctl/src/airflow/ctl/utils.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import sys +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import datetime + from collections.abc import Collection + from io import IOBase, TextIOWrapper + + from sqlalchemy.orm import Session + + from airflow.models.dagrun import DagRun + + +class CliConflictError(Exception): + """Error for when CLI commands are defined twice by different sources.""" + + pass + + +def is_stdout(fileio: IOBase) -> bool: + """ + Check whether a file IO is stdout. + + The intended use case for this helper is to check whether an argument parsed + with argparse.FileType points to stdout (by setting the path to ``-``). This + is why there is no equivalent for stderr; argparse does not allow using it. + + .. warning:: *fileio* must be open for this check to be successful. + """ + return fileio.fileno() == sys.stdout.fileno() + + +def print_export_output(command_type: str, exported_items: Collection, file: TextIOWrapper): + if not file.closed and is_stdout(file): + print(f"\n{len(exported_items)} {command_type} successfully exported.", file=sys.stderr) + else: + print(f"{len(exported_items)} {command_type} successfully exported to {file.name}.") + + +def fetch_dag_run_from_run_id_or_logical_date_string( + *, + dag_id: str, + value: str, + session: Session, +) -> tuple[DagRun | None, datetime.datetime | None]: + """ + Try to find a DAG run with a given string value. + + The string value may be a run ID, or a logical date in string form. We first + try to use it as a run_id; if a run is found, it is returned as-is. + + Otherwise, the string value is parsed into a datetime. If that works, it is + used to find a DAG run. + + The return value is a two-tuple. The first item is the found DAG run (or + *None* if one cannot be found). The second is the parsed logical date. This + second value can be used to create a new run by the calling function when + one cannot be found here. + """ + from pendulum.parsing.exceptions import ParserError + from sqlalchemy import select + + from airflow.models.dag import DAG + from airflow.models.dagrun import DagRun + from airflow.utils import timezone + + if dag_run := DAG.fetch_dagrun(dag_id=dag_id, run_id=value, session=session): + return dag_run, dag_run.logical_date + try: + logical_date = timezone.parse(value) + except (ParserError, TypeError): + return None, None + dag_run = session.scalar( + select(DagRun) + .where(DagRun.dag_id == dag_id, DagRun.logical_date == logical_date) + .order_by(DagRun.id.desc()) + .limit(1) + ) + return dag_run, logical_date diff --git a/airflow/cli/api/__init__.py b/airflowctl/tests/__init__.py similarity index 100% copy from airflow/cli/api/__init__.py copy to airflowctl/tests/__init__.py diff --git a/airflow/cli/api/__init__.py b/airflowctl/tests/api/__init__.py similarity index 100% copy from airflow/cli/api/__init__.py copy to airflowctl/tests/api/__init__.py diff --git a/tests/cli/api/test_client.py b/airflowctl/tests/api/test_client.py similarity index 97% rename from tests/cli/api/test_client.py rename to airflowctl/tests/api/test_client.py index 9aab92772f6..81fe2f7c1a4 100644 --- a/tests/cli/api/test_client.py +++ b/airflowctl/tests/api/test_client.py @@ -26,8 +26,8 @@ import httpx import pytest from platformdirs import user_config_path -from airflow.cli.api.client import Client, Credentials -from airflow.cli.api.operations import ServerResponseError +from airflow.api.client import Client, Credentials +from airflow.api.operations import ServerResponseError from airflow.exceptions import AirflowNotFoundException diff --git a/tests/cli/api/test_operations.py b/airflowctl/tests/api/test_operations.py similarity index 99% rename from tests/cli/api/test_operations.py rename to airflowctl/tests/api/test_operations.py index b2d28e89f85..b40d6105c55 100644 --- a/tests/cli/api/test_operations.py +++ b/airflowctl/tests/api/test_operations.py @@ -26,8 +26,8 @@ from io import StringIO import httpx import pytest -from airflow.cli.api.client import Client -from airflow.cli.api.datamodels._generated import ( +from airflow.api.client import Client +from airflow.api.datamodels.generated import ( AssetAliasCollectionResponse, AssetAliasResponse, AssetCollectionResponse, diff --git a/airflow/cli/api/__init__.py b/airflowctl/tests/commands/__init__.py similarity index 100% rename from airflow/cli/api/__init__.py rename to airflowctl/tests/commands/__init__.py diff --git a/tests/cli/commands/remote_commands/test_auth_command.py b/airflowctl/tests/commands/test_auth_command.py similarity index 100% rename from tests/cli/commands/remote_commands/test_auth_command.py rename to airflowctl/tests/commands/test_auth_command.py diff --git a/tests/cli/conftest.py b/airflowctl/tests/conftest.py similarity index 59% copy from tests/cli/conftest.py copy to airflowctl/tests/conftest.py index fba13c2b7d9..05d483bbf0f 100644 --- a/tests/cli/conftest.py +++ b/airflowctl/tests/conftest.py @@ -17,53 +17,14 @@ # under the License. from __future__ import annotations -import sys from unittest.mock import patch import httpx import pytest -from airflow.cli.api.client import Client, Credentials -from airflow.executors import local_executor -from airflow.models.dagbag import DagBag -from airflow.providers.celery.executors import celery_executor -from airflow.providers.cncf.kubernetes.executors import kubernetes_executor +from airflow.api.client import Client, Credentials -from tests_common.test_utils.config import conf_vars -# Create custom executors here because conftest is imported first -custom_executor_module = type(sys)("custom_executor") -custom_executor_module.CustomCeleryExecutor = type( # type: ignore - "CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {} -) -custom_executor_module.CustomLocalExecutor = type( # type: ignore - "CustomLocalExecutor", (local_executor.LocalExecutor,), {} -) -custom_executor_module.CustomKubernetesExecutor = type( # type: ignore - "CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {} -) -sys.modules["custom_executor"] = custom_executor_module - - [email protected](autouse=True) -def load_examples(): - with conf_vars({("core", "load_examples"): "True"}): - yield - - [email protected](scope="session") -def dagbag(): - return DagBag(include_examples=True) - - [email protected](scope="session") -def parser(): - from airflow.cli import cli_parser - - return cli_parser.get_parser() - - -# TODO this is not used at the moment but it is part of airflow/cli/api/client.py test suite @pytest.fixture(scope="session") def cli_api_client_maker(client_credentials): """ diff --git a/hatch_build.py b/hatch_build.py index c76e0b38961..341a96d366d 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -222,7 +222,6 @@ DEPENDENCIES = [ "itsdangerous>=2.0", "jinja2>=3.0.0", "jsonschema>=4.18.0", - "keyring>=25.6.0", "lazy-object-proxy>=1.2.0", "libcst >=1.1.0", "linkify-it-py>=2.0.0", @@ -236,7 +235,6 @@ DEPENDENCIES = [ "opentelemetry-exporter-otlp>=1.24.0", "packaging>=23.2", "pathspec>=0.9.0", - "platformdirs>=4.3.6", 'pendulum>=2.1.2,<4.0;python_version<"3.12"', 'pendulum>=3.0.0,<4.0;python_version>="3.12"', "pluggy>=1.5.0", diff --git a/pyproject.toml b/pyproject.toml index 81386edebe8..52d4f5750eb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -317,7 +317,9 @@ testing = ["dev", "providers.tests", "tests_common", "tests"] "providers/cncf/*/src/airflow/providers/cncf/__init__.py" = ["I002"] "providers/dbt/*/src/airflow/providers/dbt/__init__.py" = ["I002"] "providers/microsoft/*/src/airflow/providers/microsoft/__init__.py" = ["I002"] -"airflow/cli/api/datamodels/_generated.py" = ["UP007", "D101", "D200"] + +# TODO (47972) couldn't find the way of pushing without making ruff happy +"airflowctl/src/airflow/api/datamodels/generated.py" = ["UP007", "D101", "D200"] # The test_python.py is needed because adding __future__.annotations breaks runtime checks that are # needed for the test to work @@ -575,6 +577,7 @@ explicit_package_bases = true mypy_path = [ "$MYPY_CONFIG_FILE_DIR", "$MYPY_CONFIG_FILE_DIR/task_sdk/src", + "$MYPY_CONFIG_FILE_DIR/airflowctl/src", ] [[tool.mypy.overrides]] @@ -608,17 +611,16 @@ module=[ ] ignore_errors = true +# TODO check this and create mypy check for airflowctl [[tool.mypy.overrides]] -module="airflow.cli.api.datamodels.*" +module="airflowctl.*" ignore_errors = true [dependency-groups] -codegen = [ - "datamodel-code-generator[http]==0.28.2", -] dev = [ "apache-airflow[aiobotocore,async,apache-atlas,apache-webhdfs,cgroups,cloudpickle,github-enterprise,google-auth,graphviz,kerberos,ldap,leveldb,otel,pandas,password,rabbitmq,s3fs,sentry,statsd,uv]", "apache-airflow-task-sdk", + "apache-airflow-ctl", "apache-airflow-devel-common", "apache-airflow-providers-airbyte", "apache-airflow-providers-alibaba", @@ -724,6 +726,7 @@ no-build-isolation-package = ["sphinx-redoc"] # These names must match the names as defined in the pyproject.toml of the workspace items, # *not* the workspace folder paths apache-airflow = {workspace = true} +apache-airflow-ctl = {workspace = true} apache-airflow-providers-airbyte = {workspace = true} apache-airflow-providers-alibaba = { workspace = true } apache-airflow-providers-amazon = { workspace = true } @@ -918,32 +921,7 @@ members = [ "providers/ydb", "providers/zendesk", ".", + "airflowctl", "task-sdk", "devel-common", ] - - -# To use: -# -# TODO: Automate this in CI via pre-commit hook and generate the file each time -# The API should be running in the background to serve the OpenAPI schema -# uv run --group codegen --project apache-airflow --directory airflow/ datamodel-codegen -[tool.datamodel-codegen] -capitalise-enum-members=true # `State.RUNNING` not `State.running` -disable-timestamp=true -enable-version-header=true -enum-field-as-literal='one' # When a single enum member, make it output a `Literal["..."]` -input-file-type='openapi' -output-model-type='pydantic_v2.BaseModel' -output-datetime-class='datetime' -target-python-version='3.9' -use-annotated=true -use-default=true -use-double-quotes=true -use-schema-description=true # Desc becomes class doc comment -use-standard-collections=true # list[] not List[] -use-subclass-enum=true # enum, not union of Literals -use-union-operator=true # 3.9+annotations, not `Union[]` - -url = "http://0.0.0.0:8080/openapi.json" -output = "cli/api/datamodels/_generated.py" diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/airflow/api/datamodels/generated.py similarity index 100% rename from task-sdk/src/airflow/sdk/api/datamodels/_generated.py rename to task-sdk/src/airflow/sdk/airflow/api/datamodels/generated.py diff --git a/tests/cli/conftest.py b/tests/cli/conftest.py index fba13c2b7d9..5c46e0aa01b 100644 --- a/tests/cli/conftest.py +++ b/tests/cli/conftest.py @@ -18,12 +18,9 @@ from __future__ import annotations import sys -from unittest.mock import patch -import httpx import pytest -from airflow.cli.api.client import Client, Credentials from airflow.executors import local_executor from airflow.models.dagbag import DagBag from airflow.providers.celery.executors import celery_executor @@ -61,34 +58,3 @@ def parser(): from airflow.cli import cli_parser return cli_parser.get_parser() - - -# TODO this is not used at the moment but it is part of airflow/cli/api/client.py test suite [email protected](scope="session") -def cli_api_client_maker(client_credentials): - """ - Create a CLI API client with a custom transport and returns callable to create a client with a custom transport - """ - - def make_cli_api_client(transport: httpx.MockTransport) -> Client: - """Get a client with a custom transport""" - return Client(base_url="test://server", transport=transport, token="") - - def _cli_api_client(path: str, response_json: dict, expected_http_status_code: int) -> Client: - """Get a client with a custom transport""" - - def handle_request(request: httpx.Request) -> httpx.Response: - """Handle the request and return a response""" - assert request.url.path == path - return httpx.Response(expected_http_status_code, json=response_json) - - return make_cli_api_client(transport=httpx.MockTransport(handle_request)) - - return _cli_api_client - - [email protected](scope="session") -def client_credentials(): - """Create credentials for CLI API""" - with patch("airflow.cli.api.client.keyring"): - Credentials(api_url="http://localhost:9091", api_token="NO_TOKEN").save()
