This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 829f19f Updating core example DAGs to use TaskFlow API where applicable (#18562) 829f19f is described below commit 829f19f94ea88db1825261ef2fd4920f8a9558eb Author: Josh Fell <48934154+josh-f...@users.noreply.github.com> AuthorDate: Wed Oct 13 09:24:50 2021 -0400 Updating core example DAGs to use TaskFlow API where applicable (#18562) --- airflow/example_dags/example_branch_operator.py | 3 +- airflow/example_dags/example_complex.py | 3 +- airflow/example_dags/example_dag_decorator.py | 2 +- .../example_dags/example_kubernetes_executor.py | 93 +++++----- .../example_kubernetes_executor_config.py | 203 ++++++++++----------- airflow/example_dags/example_nested_branch_dag.py | 5 +- .../example_passing_params_via_test_command.py | 20 +- airflow/example_dags/example_python_operator.py | 43 ++--- airflow/example_dags/example_skip_dag.py | 15 +- airflow/example_dags/example_trigger_target_dag.py | 11 +- airflow/example_dags/example_xcom.py | 48 ++--- airflow/example_dags/example_xcomargs.py | 16 +- .../endpoints/test_task_instance_endpoint.py | 4 +- 13 files changed, 222 insertions(+), 244 deletions(-) diff --git a/airflow/example_dags/example_branch_operator.py b/airflow/example_dags/example_branch_operator.py index afc0340..69f939e 100644 --- a/airflow/example_dags/example_branch_operator.py +++ b/airflow/example_dags/example_branch_operator.py @@ -25,6 +25,7 @@ from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.python import BranchPythonOperator from airflow.utils.edgemodifier import Label +from airflow.utils.trigger_rule import TriggerRule with DAG( dag_id='example_branch_operator', @@ -47,7 +48,7 @@ with DAG( join = DummyOperator( task_id='join', - trigger_rule='none_failed_min_one_success', + trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS, ) for option in options: diff --git a/airflow/example_dags/example_complex.py b/airflow/example_dags/example_complex.py index 542dee8..a141236 100644 --- a/airflow/example_dags/example_complex.py +++ b/airflow/example_dags/example_complex.py @@ -24,7 +24,6 @@ from datetime import datetime from airflow import models from airflow.models.baseoperator import chain from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator with models.DAG( dag_id="example_complex", @@ -131,7 +130,7 @@ with models.DAG( ) # Search - search_catalog = PythonOperator(task_id="search_catalog", python_callable=lambda: print("search_catalog")) + search_catalog = BashOperator(task_id="search_catalog", bash_command="echo search_catalog") search_catalog_result = BashOperator( task_id="search_catalog_result", bash_command="echo search_catalog_result" diff --git a/airflow/example_dags/example_dag_decorator.py b/airflow/example_dags/example_dag_decorator.py index 2a74988..66b0fa4 100644 --- a/airflow/example_dags/example_dag_decorator.py +++ b/airflow/example_dags/example_dag_decorator.py @@ -26,7 +26,7 @@ from airflow.operators.email import EmailOperator class GetRequestOperator(BaseOperator): - """Custom operator to sand GET request to provided url""" + """Custom operator to send GET request to provided url""" def __init__(self, *, url: str, **kwargs): super().__init__(**kwargs) diff --git a/airflow/example_dags/example_kubernetes_executor.py b/airflow/example_dags/example_kubernetes_executor.py index ded6e97..b7ca2de 100644 --- a/airflow/example_dags/example_kubernetes_executor.py +++ b/airflow/example_dags/example_kubernetes_executor.py @@ -22,8 +22,8 @@ import os from datetime import datetime from airflow import DAG +from airflow.decorators import task from airflow.example_dags.libs.helper import print_stuff -from airflow.operators.python import PythonOperator with DAG( dag_id='example_kubernetes_executor', @@ -32,6 +32,33 @@ with DAG( catchup=False, tags=['example', 'example2'], ) as dag: + # You don't have to use any special KubernetesExecutor configuration if you don't want to + @task + def start_task(): + print_stuff() + + # But you can if you want to + kube_exec_config_special = {"KubernetesExecutor": {"image": "airflow/ci:latest"}} + + @task(executor_config=kube_exec_config_special) + def one_task(): + print_stuff() + + # Use the zip binary, which is only found in this special docker image + kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}} + + @task(task_id="two_task", executor_config=kube_exec_config_zip_binary) + def assert_zip_binary(): + """ + Checks whether Zip is installed. + + :raises SystemError: if zip is not installed + """ + return_code = os.system("zip") + if return_code != 0: + raise SystemError("The zip binary is not found") + + # Limit resources on this operator/task with node affinity & tolerations affinity = { 'podAntiAffinity': { 'requiredDuringSchedulingIgnoredDuringExecution': [ @@ -47,52 +74,30 @@ with DAG( tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 'airflow'}] - def assert_zip_binary(): - """ - Checks whether Zip is installed. - - :raises SystemError: if zip is not installed - """ - return_code = os.system("zip") - if return_code != 0: - raise SystemError("The zip binary is not found") - - # You don't have to use any special KubernetesExecutor configuration if you don't want to - start_task = PythonOperator(task_id="start_task", python_callable=print_stuff) + kube_exec_config_resource_limits = { + "KubernetesExecutor": { + "request_memory": "128Mi", + "limit_memory": "128Mi", + "tolerations": tolerations, + "affinity": affinity, + } + } - # But you can if you want to - one_task = PythonOperator( - task_id="one_task", - python_callable=print_stuff, - executor_config={"KubernetesExecutor": {"image": "airflow/ci:latest"}}, - ) + @task(executor_config=kube_exec_config_resource_limits) + def three_task(): + print_stuff() - # Use the zip binary, which is only found in this special docker image - two_task = PythonOperator( - task_id="two_task", - python_callable=assert_zip_binary, - executor_config={"KubernetesExecutor": {"image": "airflow/ci_zip:latest"}}, - ) + # Add arbitrary labels to worker pods + kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": "bar"}}} - # Limit resources on this operator/task with node affinity & tolerations - three_task = PythonOperator( - task_id="three_task", - python_callable=print_stuff, - executor_config={ - "KubernetesExecutor": { - "request_memory": "128Mi", - "limit_memory": "128Mi", - "tolerations": tolerations, - "affinity": affinity, - } - }, - ) + @task(executor_config=kube_exec_config_pod_labels) + def four_task(): + print_stuff() - # Add arbitrary labels to worker pods - four_task = PythonOperator( - task_id="four_task", - python_callable=print_stuff, - executor_config={"KubernetesExecutor": {"labels": {"foo": "bar"}}}, - ) + start_task = start_task() + one_task = one_task() + two_task = assert_zip_binary() + three_task = three_task() + four_task = four_task() start_task >> [one_task, two_task, three_task, four_task] diff --git a/airflow/example_dags/example_kubernetes_executor_config.py b/airflow/example_dags/example_kubernetes_executor_config.py index 88fced8..87077c9 100644 --- a/airflow/example_dags/example_kubernetes_executor_config.py +++ b/airflow/example_dags/example_kubernetes_executor_config.py @@ -23,8 +23,8 @@ import os from datetime import datetime from airflow import DAG +from airflow.decorators import task from airflow.example_dags.libs.helper import print_stuff -from airflow.operators.python import PythonOperator from airflow.settings import AIRFLOW_HOME log = logging.getLogger(__name__) @@ -39,20 +39,40 @@ try: catchup=False, tags=['example3'], ) as dag: + # You can use annotations on your kubernetes pods! + start_task_executor_config = { + "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"})) + } - def test_sharedvolume_mount(): - """ - Tests whether the volume has been mounted. - """ - for i in range(5): - try: - return_code = os.system("cat /shared/test.txt") - if return_code != 0: - raise ValueError(f"Error when checking volume mount. Return code {return_code}") - except ValueError as e: - if i > 4: - raise e + @task(executor_config=start_task_executor_config) + def start_task(): + print_stuff() + + start_task = start_task() + + # [START task_with_volume] + executor_config_volume_mount = { + "pod_override": k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + volume_mounts=[ + k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume") + ], + ) + ], + volumes=[ + k8s.V1Volume( + name="example-kubernetes-test-volume", + host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), + ) + ], + ) + ), + } + @task(executor_config=executor_config_volume_mount) def test_volume_mount(): """ Tests whether the volume has been mounted. @@ -64,104 +84,83 @@ try: if return_code != 0: raise ValueError(f"Error when checking volume mount. Return code {return_code}") - # You can use annotations on your kubernetes pods! - start_task = PythonOperator( - task_id="start_task", - python_callable=print_stuff, - executor_config={ - "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"})) - }, - ) - - # [START task_with_volume] - volume_task = PythonOperator( - task_id="task_with_volume", - python_callable=test_volume_mount, - executor_config={ - "pod_override": k8s.V1Pod( - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - volume_mounts=[ - k8s.V1VolumeMount( - mount_path="/foo/", name="example-kubernetes-test-volume" - ) - ], - ) - ], - volumes=[ - k8s.V1Volume( - name="example-kubernetes-test-volume", - host_path=k8s.V1HostPathVolumeSource(path="/tmp/"), - ) - ], - ) - ), - }, - ) + volume_task = test_volume_mount() # [END task_with_volume] # [START task_with_template] - task_with_template = PythonOperator( - task_id="task_with_template", - python_callable=print_stuff, - executor_config={ - "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"), - "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})), - }, - ) + executor_config_template = { + "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"), + "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})), + } + + @task(executor_config=executor_config_template) + def task_with_template(): + print_stuff() + + task_with_template = task_with_template() # [END task_with_template] # [START task_with_sidecar] - sidecar_task = PythonOperator( - task_id="task_with_sidecar", - python_callable=test_sharedvolume_mount, - executor_config={ - "pod_override": k8s.V1Pod( - spec=k8s.V1PodSpec( - containers=[ - k8s.V1Container( - name="base", - volume_mounts=[ - k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir") - ], - ), - k8s.V1Container( - name="sidecar", - image="ubuntu", - args=["echo \"retrieved from mount\" > /shared/test.txt"], - command=["bash", "-cx"], - volume_mounts=[ - k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir") - ], - ), - ], - volumes=[ - k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()), - ], - ) - ), - }, - ) + executor_config_sidecar = { + "pod_override": k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[ + k8s.V1Container( + name="base", + volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")], + ), + k8s.V1Container( + name="sidecar", + image="ubuntu", + args=["echo \"retrieved from mount\" > /shared/test.txt"], + command=["bash", "-cx"], + volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")], + ), + ], + volumes=[ + k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()), + ], + ) + ), + } + + @task(executor_config=executor_config_sidecar) + def test_sharedvolume_mount(): + """ + Tests whether the volume has been mounted. + """ + for i in range(5): + try: + return_code = os.system("cat /shared/test.txt") + if return_code != 0: + raise ValueError(f"Error when checking volume mount. Return code {return_code}") + except ValueError as e: + if i > 4: + raise e + + sidecar_task = test_sharedvolume_mount() # [END task_with_sidecar] # Test that we can add labels to pods - third_task = PythonOperator( - task_id="non_root_task", - python_callable=print_stuff, - executor_config={ - "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})) - }, - ) - - other_ns_task = PythonOperator( - task_id="other_namespace_task", - python_callable=print_stuff, - executor_config={ - "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}} - }, - ) + executor_config_non_root = { + "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})) + } + + @task(executor_config=executor_config_non_root) + def non_root_task(): + print_stuff() + + third_task = non_root_task() + + executor_config_other_ns = { + "KubernetesExecutor": {"namespace": "test-namespace", "labels": {"release": "stable"}} + } + + @task(executor_config=executor_config_other_ns) + def other_namespace_task(): + print_stuff() + + other_ns_task = other_namespace_task() start_task >> volume_task >> third_task start_task >> other_ns_task diff --git a/airflow/example_dags/example_nested_branch_dag.py b/airflow/example_dags/example_nested_branch_dag.py index cd036b0..add81a9 100644 --- a/airflow/example_dags/example_nested_branch_dag.py +++ b/airflow/example_dags/example_nested_branch_dag.py @@ -26,6 +26,7 @@ from datetime import datetime from airflow.models import DAG from airflow.operators.dummy import DummyOperator from airflow.operators.python import BranchPythonOperator +from airflow.utils.trigger_rule import TriggerRule with DAG( dag_id="example_nested_branch_dag", @@ -35,11 +36,11 @@ with DAG( tags=["example"], ) as dag: branch_1 = BranchPythonOperator(task_id="branch_1", python_callable=lambda: "true_1") - join_1 = DummyOperator(task_id="join_1", trigger_rule="none_failed_min_one_success") + join_1 = DummyOperator(task_id="join_1", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) true_1 = DummyOperator(task_id="true_1") false_1 = DummyOperator(task_id="false_1") branch_2 = BranchPythonOperator(task_id="branch_2", python_callable=lambda: "true_2") - join_2 = DummyOperator(task_id="join_2", trigger_rule="none_failed_min_one_success") + join_2 = DummyOperator(task_id="join_2", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS) true_2 = DummyOperator(task_id="true_2") false_2 = DummyOperator(task_id="false_2") false_3 = DummyOperator(task_id="false_3") diff --git a/airflow/example_dags/example_passing_params_via_test_command.py b/airflow/example_dags/example_passing_params_via_test_command.py index 41930d2..e3f04c4 100644 --- a/airflow/example_dags/example_passing_params_via_test_command.py +++ b/airflow/example_dags/example_passing_params_via_test_command.py @@ -23,11 +23,12 @@ from datetime import datetime, timedelta from textwrap import dedent from airflow import DAG +from airflow.decorators import task from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator -def my_py_command(test_mode, params): +@task(task_id="run_this") +def my_py_command(params, test_mode=None, task=None): """ Print out the "foo" param passed in via `airflow tasks test example_passing_params_via_test_command run_this <date> @@ -37,7 +38,7 @@ def my_py_command(test_mode, params): print( " 'foo' was passed in via test={} command : kwargs[params][foo] \ = {}".format( - test_mode, params["foo"] + test_mode, task.params["foo"] ) ) # Print out the value of "miff", passed in below via the Python Operator @@ -45,7 +46,8 @@ def my_py_command(test_mode, params): return 1 -def print_env_vars(test_mode): +@task(task_id="env_var_test_task") +def print_env_vars(test_mode=None): """ Print out the "foo" param passed in via `airflow tasks test example_passing_params_via_test_command env_var_test_task <date> @@ -64,6 +66,8 @@ with DAG( dagrun_timeout=timedelta(minutes=4), tags=['example'], ) as dag: + run_this = my_py_command(params={"miff": "agg"}) + my_templated_command = dedent( """ echo " 'foo was passed in via Airflow CLI Test command with value {{ params.foo }} " @@ -71,18 +75,12 @@ with DAG( """ ) - run_this = PythonOperator( - task_id='run_this', - python_callable=my_py_command, - params={"miff": "agg"}, - ) - also_run_this = BashOperator( task_id='also_run_this', bash_command=my_templated_command, params={"miff": "agg"}, ) - env_var_test_task = PythonOperator(task_id='env_var_test_task', python_callable=print_env_vars) + env_var_test_task = print_env_vars() run_this >> also_run_this diff --git a/airflow/example_dags/example_python_operator.py b/airflow/example_dags/example_python_operator.py index 50ca185..8d5ce59 100644 --- a/airflow/example_dags/example_python_operator.py +++ b/airflow/example_dags/example_python_operator.py @@ -16,13 +16,16 @@ # specific language governing permissions and limitations # under the License. -"""Example DAG demonstrating the usage of the PythonOperator.""" +""" +Example DAG demonstrating the usage of the TaskFlow API to execute Python functions natively and within a +virtual environment. +""" import time from datetime import datetime from pprint import pprint from airflow import DAG -from airflow.operators.python import PythonOperator, PythonVirtualenvOperator +from airflow.decorators import task with DAG( dag_id='example_python_operator', @@ -32,35 +35,34 @@ with DAG( tags=['example'], ) as dag: # [START howto_operator_python] - def print_context(ds, **kwargs): + @task(task_id="print_the_context") + def print_context(ds=None, **kwargs): """Print the Airflow context and ds variable from the context.""" pprint(kwargs) print(ds) return 'Whatever you return gets printed in the logs' - run_this = PythonOperator( - task_id='print_the_context', - python_callable=print_context, - ) + run_this = print_context() # [END howto_operator_python] # [START howto_operator_python_kwargs] - def my_sleeping_function(random_base): - """This is a function that will run within the DAG execution""" - time.sleep(random_base) - # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively for i in range(5): - task = PythonOperator( - task_id='sleep_for_' + str(i), - python_callable=my_sleeping_function, - op_kwargs={'random_base': float(i) / 10}, - ) - run_this >> task + @task(task_id=f'sleep_for_{i}') + def my_sleeping_function(random_base): + """This is a function that will run within the DAG execution""" + time.sleep(random_base) + + sleeping_task = my_sleeping_function(random_base=float(i) / 10) + + run_this >> sleeping_task # [END howto_operator_python_kwargs] # [START howto_operator_python_venv] + @task.virtualenv( + task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False + ) def callable_virtualenv(): """ Example function that will be performed in a virtual environment. @@ -81,10 +83,5 @@ with DAG( sleep(10) print('Finished') - virtualenv_task = PythonVirtualenvOperator( - task_id="virtualenv_python", - python_callable=callable_virtualenv, - requirements=["colorama==0.4.0"], - system_site_packages=False, - ) + virtualenv_task = callable_virtualenv() # [END howto_operator_python_venv] diff --git a/airflow/example_dags/example_skip_dag.py b/airflow/example_dags/example_skip_dag.py index ab8c8df..cb664e7 100644 --- a/airflow/example_dags/example_skip_dag.py +++ b/airflow/example_dags/example_skip_dag.py @@ -23,6 +23,7 @@ from datetime import datetime from airflow import DAG from airflow.exceptions import AirflowSkipException from airflow.operators.dummy import DummyOperator +from airflow.utils.trigger_rule import TriggerRule # Create some placeholder operators @@ -35,7 +36,7 @@ class DummySkipOperator(DummyOperator): raise AirflowSkipException -def create_test_pipeline(suffix, trigger_rule, dag_): +def create_test_pipeline(suffix, trigger_rule): """ Instantiate a number of operators for the given DAG. @@ -43,10 +44,10 @@ def create_test_pipeline(suffix, trigger_rule, dag_): :param str trigger_rule: TriggerRule for the join task :param DAG dag_: The DAG to run the operators on """ - skip_operator = DummySkipOperator(task_id=f'skip_operator_{suffix}', dag=dag_) - always_true = DummyOperator(task_id=f'always_true_{suffix}', dag=dag_) - join = DummyOperator(task_id=trigger_rule, dag=dag_, trigger_rule=trigger_rule) - final = DummyOperator(task_id=f'final_{suffix}', dag=dag_) + skip_operator = DummySkipOperator(task_id=f'skip_operator_{suffix}') + always_true = DummyOperator(task_id=f'always_true_{suffix}') + join = DummyOperator(task_id=trigger_rule, trigger_rule=trigger_rule) + final = DummyOperator(task_id=f'final_{suffix}') skip_operator >> join always_true >> join @@ -54,5 +55,5 @@ def create_test_pipeline(suffix, trigger_rule, dag_): with DAG(dag_id='example_skip_dag', start_date=datetime(2021, 1, 1), catchup=False, tags=['example']) as dag: - create_test_pipeline('1', 'all_success', dag) - create_test_pipeline('2', 'one_success', dag) + create_test_pipeline('1', TriggerRule.ALL_SUCCESS) + create_test_pipeline('2', TriggerRule.ONE_SUCCESS) diff --git a/airflow/example_dags/example_trigger_target_dag.py b/airflow/example_dags/example_trigger_target_dag.py index 32e06a3..41aecf1 100644 --- a/airflow/example_dags/example_trigger_target_dag.py +++ b/airflow/example_dags/example_trigger_target_dag.py @@ -24,18 +24,19 @@ Example usage of the TriggerDagRunOperator. This example holds 2 DAGs: from datetime import datetime from airflow import DAG +from airflow.decorators import task from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator -def run_this_func(dag_run): +@task(task_id="run_this") +def run_this_func(dag_run=None): """ Print the payload "message" passed to the DagRun conf attribute. :param dag_run: The DagRun object :type dag_run: DagRun """ - print(f"Remotely received value of {dag_run.conf['message']} for key=message") + print(f"Remotely received value of {dag_run.conf.get('message')} for key=message") with DAG( @@ -45,10 +46,10 @@ with DAG( schedule_interval=None, tags=['example'], ) as dag: - run_this = PythonOperator(task_id="run_this", python_callable=run_this_func) + run_this = run_this_func() bash_task = BashOperator( task_id="bash_task", bash_command='echo "Here is the message: $message"', - env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'}, + env={'message': '{{ dag_run.conf.get("message") }}'}, ) diff --git a/airflow/example_dags/example_xcom.py b/airflow/example_dags/example_xcom.py index a8c5d99..405d5c5 100644 --- a/airflow/example_dags/example_xcom.py +++ b/airflow/example_dags/example_xcom.py @@ -20,19 +20,21 @@ from datetime import datetime from airflow import DAG +from airflow.decorators import task from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator value_1 = [1, 2, 3] value_2 = {'a': 'b'} -def push(**kwargs): +@task +def push(ti=None): """Pushes an XCom without a specific target""" - kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1) + ti.xcom_push(key='value from pusher 1', value=value_1) -def push_by_returning(**kwargs): +@task +def push_by_returning(): """Pushes an XCom without a specific target, just by returning it""" return value_2 @@ -42,16 +44,17 @@ def _compare_values(pulled_value, check_value): raise ValueError(f'The two values differ {pulled_value} and {check_value}') -def puller(pulled_value_1, pulled_value_2, **kwargs): +@task +def puller(pulled_value_2, ti=None): """Pull all previously pushed XComs and check if the pushed values match the pulled values.""" + pulled_value_1 = ti.xcom_pull(task_ids="push", key="value from pusher 1") - # Check pulled values from function args _compare_values(pulled_value_1, value_1) _compare_values(pulled_value_2, value_2) -def pull_value_from_bash_push(**kwargs): - ti = kwargs['ti'] +@task +def pull_value_from_bash_push(ti=None): bash_pushed_via_return_value = ti.xcom_pull(key="return_value", task_ids='bash_push') bash_manually_pushed_value = ti.xcom_pull(key="manually_pushed_value", task_ids='bash_push') print(f"The xcom value pushed by task push via return value is {bash_pushed_via_return_value}") @@ -65,25 +68,6 @@ with DAG( catchup=False, tags=['example'], ) as dag: - push1 = PythonOperator( - task_id='push', - python_callable=push, - ) - - push2 = PythonOperator( - task_id='push_by_returning', - python_callable=push_by_returning, - ) - - pull = PythonOperator( - task_id='puller', - python_callable=puller, - op_kwargs={ - 'pulled_value_1': push1.output['value from pusher 1'], - 'pulled_value_2': push2.output, - }, - ) - bash_push = BashOperator( task_id='bash_push', bash_command='echo "bash_push demo" && ' @@ -101,13 +85,11 @@ with DAG( do_xcom_push=False, ) - python_pull_from_bash = PythonOperator( - task_id='python_pull_from_bash', - python_callable=pull_value_from_bash_push, - ) + python_pull_from_bash = pull_value_from_bash_push() [bash_pull, python_pull_from_bash] << bash_push + puller(push_by_returning()) << push() + # Task dependencies created via `XComArgs`: - # push1 >> pull - # push2 >> pull + # pull << push2 diff --git a/airflow/example_dags/example_xcomargs.py b/airflow/example_dags/example_xcomargs.py index 382e9af..7e0cdd9 100644 --- a/airflow/example_dags/example_xcomargs.py +++ b/airflow/example_dags/example_xcomargs.py @@ -23,21 +23,20 @@ from datetime import datetime from airflow import DAG from airflow.decorators import task from airflow.operators.bash import BashOperator -from airflow.operators.python import PythonOperator, get_current_context log = logging.getLogger(__name__) +@task def generate_value(): """Dummy function""" return "Bring me a shrubbery!" -@task() -def print_value(value): +@task +def print_value(value, ts=None): """Dummy function""" - ctx = get_current_context() - log.info("The knights of Ni say: %s (at %s)", value, ctx['ts']) + log.info("The knights of Ni say: %s (at %s)", value, ts) with DAG( @@ -47,12 +46,7 @@ with DAG( schedule_interval=None, tags=['example'], ) as dag: - task1 = PythonOperator( - task_id='generate_value', - python_callable=generate_value, - ) - - print_value(task1.output) + print_value(generate_value()) with DAG( "example_xcom_args_with_operators", diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 719ae35..315f66a 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -214,7 +214,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint): "executor_config": "{}", "hostname": "", "max_tries": 0, - "operator": "PythonOperator", + "operator": "_PythonDecoratedOperator", "pid": 100, "pool": "default_pool", "pool_slots": 1, @@ -254,7 +254,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint): "executor_config": "{}", "hostname": "", "max_tries": 0, - "operator": "PythonOperator", + "operator": "_PythonDecoratedOperator", "pid": 100, "pool": "default_pool", "pool_slots": 1,