This is an automated email from the ASF dual-hosted git repository. weilee pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 79a9c828698 Add 'airflow assets details' subcommand (#44445) 79a9c828698 is described below commit 79a9c828698c0eba9908ff8d3d6073bfa040494f Author: Tzu-ping Chung <uranu...@gmail.com> AuthorDate: Fri Nov 29 10:05:29 2024 +0800 Add 'airflow assets details' subcommand (#44445) * Add 'airflow assets details' subcommand * Limit unnecessary assets fetched --- airflow/cli/cli_config.py | 9 +++++++++ airflow/cli/commands/asset_command.py | 32 ++++++++++++++++++++++++++++++++ tests/cli/commands/test_asset_command.py | 26 ++++++++++++++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 0c9703ec780..1d113a00ed9 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -935,6 +935,9 @@ ARG_ASSET_LIST_COLUMNS = Arg( default=("name", "uri", "group", "extra"), ) +ARG_ASSET_NAME = Arg(("--name",), help="Asset name") +ARG_ASSET_URI = Arg(("--uri",), help="Asset URI") + ALTERNATIVE_CONN_SPECS_ARGS = [ ARG_CONN_TYPE, ARG_CONN_DESCRIPTION, @@ -977,6 +980,12 @@ ASSETS_COMMANDS = ( func=lazy_load_command("airflow.cli.commands.asset_command.asset_list"), args=(ARG_OUTPUT, ARG_VERBOSE, ARG_ASSET_LIST_COLUMNS), ), + ActionCommand( + name="details", + help="Show asset details", + func=lazy_load_command("airflow.cli.commands.asset_command.asset_details"), + args=(ARG_ASSET_NAME, ARG_ASSET_URI, ARG_OUTPUT, ARG_VERBOSE), + ), ) BACKFILL_COMMANDS = ( ActionCommand( diff --git a/airflow/cli/commands/asset_command.py b/airflow/cli/commands/asset_command.py index a43fe409021..5a8e67b5845 100644 --- a/airflow/cli/commands/asset_command.py +++ b/airflow/cli/commands/asset_command.py @@ -51,3 +51,35 @@ def asset_list(args, *, session: Session = NEW_SESSION) -> None: output=args.output, mapper=detail_mapper, ) + + +@cli_utils.action_cli +@provide_session +def asset_details(args, *, session: Session = NEW_SESSION) -> None: + """Display details of an asset.""" + if not args.name and not args.uri: + raise SystemExit("Either --name or --uri is required") + + stmt = select(AssetModel) + select_message_parts = [] + if args.name: + stmt = stmt.where(AssetModel.name == args.name) + select_message_parts.append(f"name {args.name}") + if args.uri: + stmt = stmt.where(AssetModel.uri == args.uri) + select_message_parts.append(f"URI {args.uri}") + asset_it = iter(session.scalars(stmt.limit(2))) + select_message = " and ".join(select_message_parts) + + if (asset := next(asset_it, None)) is None: + raise SystemExit(f"Asset with {select_message} does not exist.") + if next(asset_it, None) is not None: + raise SystemExit(f"More than one asset exists with {select_message}.") + + model_data = AssetResponse.model_validate(asset).model_dump() + if args.output in ["table", "plain"]: + data = [{"property_name": key, "property_value": value} for key, value in model_data.items()] + else: + data = [model_data] + + AirflowConsole().print_as(data=data, output=args.output) diff --git a/tests/cli/commands/test_asset_command.py b/tests/cli/commands/test_asset_command.py index 0cb4d8dbd84..cc5a4f1cda8 100644 --- a/tests/cli/commands/test_asset_command.py +++ b/tests/cli/commands/test_asset_command.py @@ -63,3 +63,29 @@ def test_cli_assets_list(parser: ArgumentParser) -> None: assert "group" in asset_list[0] assert "extra" in asset_list[0] assert any(asset["uri"] == "s3://dag1/output_1.txt" for asset in asset_list) + + +def test_cli_assets_details(parser: ArgumentParser) -> None: + args = parser.parse_args(["assets", "details", "--name=asset1_producer", "--output=json"]) + with contextlib.redirect_stdout(io.StringIO()) as temp_stdout: + asset_command.asset_details(args) + + asset_detail_list = json.loads(temp_stdout.getvalue()) + assert len(asset_detail_list) == 1 + + # No good way to statically compare these. + undeterministic = { + "id": None, + "created_at": None, + "updated_at": None, + "consuming_dags": None, + "producing_tasks": None, + } + + assert asset_detail_list[0] | undeterministic == undeterministic | { + "name": "asset1_producer", + "uri": "s3://bucket/asset1_producer", + "group": "asset", + "extra": {}, + "aliases": [], + }