bugraoz93 commented on code in PR #68444:
URL: https://github.com/apache/airflow/pull/68444#discussion_r3408433313
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -522,62 +488,60 @@ def iter_next_dagrun_info() -> Iterator[DagRunInfo |
None]:
@cli_utils.action_cli
+@deprecated_for_airflowctl("airflowctl dags list")
@suppress_logs_and_warning
@providers_configuration_loaded
-@provide_session
-def dag_list_dags(args, *, session: Session = NEW_SESSION) -> None:
- """Display dags with or without stats at the command line."""
+def dag_list_dags(args) -> None:
+ """Display Dags with or without stats at the command line."""
cols = args.columns if args.columns else []
if invalid_cols := [c for c in cols if c not in DAG_DETAIL_FIELDS]:
- from rich import print as rich_print
-
rich_print(
f"[red][bold]Error:[/bold] Ignoring the following invalid columns:
{invalid_cols}. "
f"List of valid columns: {sorted(DAG_DETAIL_FIELDS)}",
file=sys.stderr,
)
- dagbag_import_errors = 0
- dags_list = []
if args.local:
- from airflow.dag_processing.dagbag import DagBag
+ _list_local_dags(args, cols=cols)
+ else:
+ _list_dags_from_api(args, cols=cols)
- # Get import errors from the local area
- if args.bundle_name:
- manager = DagBundlesManager()
- validate_dag_bundle_arg(args.bundle_name)
- all_bundles = list(manager.get_all_dag_bundles())
- bundles_to_search = set(args.bundle_name)
- for bundle in all_bundles:
- if bundle.name in bundles_to_search:
- bundle_dagbag = BundleDagBag(
- bundle.path, bundle_path=bundle.path,
bundle_name=bundle.name
- )
- bundle_dagbag.collect_dags()
- dags_list.extend(list(bundle_dagbag.dags.values()))
- dagbag_import_errors += len(bundle_dagbag.import_errors)
- else:
- dagbag = DagBag()
- dagbag.collect_dags()
- dags_list.extend(list(dagbag.dags.values()))
- dagbag_import_errors += len(dagbag.import_errors)
+def _print_dag_import_error_warning() -> None:
+ rich_print(
+ "[red][bold]Error:[/bold] Failed to load all files. "
+ "For details, run `airflow dags list-import-errors`",
+ file=sys.stderr,
+ )
+
+
+@provide_session
+def _list_local_dags(args, cols: list[str] | tuple[str, ...], *, session:
Session = NEW_SESSION) -> None:
+ """List Dags parsed from local Dag bundles."""
+ dagbag_import_errors = 0
+ dags_list = []
+
+ if args.bundle_name:
+ manager = DagBundlesManager()
Review Comment:
This should use the airflowctl client rather than loading DagBundlesManager
and getting the bundles from there. When we say api, it should go through the
new airflowctl client defined
So it will already call airflowctl behind the secene while giving
deprication warnings
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/cli/api_client.py
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]