This is an automated email from the ASF dual-hosted git repository. dimberman pushed a commit to branch 1-10-yaml-generator in repository https://gitbox.apache.org/repos/asf/airflow.git
commit ca376853360c6fcbed033122e1ca11ad1d19d429 Author: Daniel Imberman <daniel.imber...@gmail.com> AuthorDate: Wed Sep 2 18:07:26 2020 -0700 YAML Generation function (cherry picked from commit 1d49f62589f981e64fd58ddc39ad78717590bc26) --- airflow/bin/cli.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 82162f2..619e305 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -19,6 +19,7 @@ # under the License. from __future__ import print_function +import datetime import errno import importlib import logging @@ -1491,6 +1492,43 @@ def list_users(args): msg = msg.encode('utf-8') print(msg) +@cli_utils.action_logging +def generate_kubernetes_pod_yaml(args): + from airflow.executors.kubernetes_executor import AirflowKubernetesScheduler, KubeConfig + from airflow.kubernetes.pod_generator import PodGenerator + from airflow.kubernetes.worker_configuration import WorkerConfiguration + execution_date = datetime.datetime(2020, 11, 3) + dag = get_dag(args) + yaml_output_path = args.output_path or "/tmp/airflow_generated_yaml/" + kube_config = KubeConfig() + for task in dag.tasks: + ti = TaskInstance(task, execution_date) + pod = PodGenerator.construct_pod( + dag_id=args.dag_id, + task_id=ti.task_id, + pod_id=AirflowKubernetesScheduler._create_pod_id( # pylint: disable=W0212 + args.dag_id, ti.task_id), + try_number=ti.try_number, + date=ti.execution_date, + command=ti.command_as_list(), + kube_executor_config=PodGenerator.from_obj(ti.executor_config), + worker_uuid="worker-config", + namespace=kube_config.executor_namespace, + worker_config=WorkerConfiguration(kube_config=kube_config).as_pod() + ) + import os + + import yaml + from kubernetes.client.api_client import ApiClient + api_client = ApiClient() + date_string = AirflowKubernetesScheduler._datetime_to_label_safe_datestring( # pylint: disable=W0212 + execution_date) + yaml_file_name = f"{args.dag_id}_{ti.task_id}_{date_string}.yml" + os.makedirs(os.path.dirname(yaml_output_path), exist_ok=True) + with open(yaml_output_path + yaml_file_name, "w") as output: + sanitized_pod = api_client.sanitize_for_serialization(pod) + output.write(yaml.dump(sanitized_pod)) + print(f"YAML output can be found at {yaml_output_path}") @cli_utils.action_logging def list_dag_runs(args, dag=None): @@ -1581,6 +1619,11 @@ class CLIFactory(object): 'execution_date': Arg( ("execution_date",), help="The execution date of the DAG", type=parsedate), + 'output_path': Arg( + ('-o', '--output-path'), + help="output path for yaml file", + default="/tmp/airflow_yaml_output/" + ), 'task_regex': Arg( ("-t", "--task_regex"), "The regex to filter specific task_ids to backfill (optional)"), @@ -2076,6 +2119,16 @@ class CLIFactory(object): 'dag_id', 'no_backfill', 'state' ) }, { + 'func': generate_kubernetes_pod_yaml, + 'help': "List dag runs given a DAG id. If state option is given, it will only" + "search for all the dagruns with the given state. " + "If no_backfill option is given, it will filter out" + "all backfill dagruns for given dag id.", + 'args': ( + 'dag_id', 'output_path', 'subdir' + ) + + }, { 'func': list_tasks, 'help': "List the tasks within a DAG", 'args': ('dag_id', 'tree', 'subdir'),