This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 829f19f  Updating core example DAGs to use TaskFlow API where 
applicable (#18562)
829f19f is described below

commit 829f19f94ea88db1825261ef2fd4920f8a9558eb
Author: Josh Fell <48934154+josh-f...@users.noreply.github.com>
AuthorDate: Wed Oct 13 09:24:50 2021 -0400

    Updating core example DAGs to use TaskFlow API where applicable (#18562)
---
 airflow/example_dags/example_branch_operator.py    |   3 +-
 airflow/example_dags/example_complex.py            |   3 +-
 airflow/example_dags/example_dag_decorator.py      |   2 +-
 .../example_dags/example_kubernetes_executor.py    |  93 +++++-----
 .../example_kubernetes_executor_config.py          | 203 ++++++++++-----------
 airflow/example_dags/example_nested_branch_dag.py  |   5 +-
 .../example_passing_params_via_test_command.py     |  20 +-
 airflow/example_dags/example_python_operator.py    |  43 ++---
 airflow/example_dags/example_skip_dag.py           |  15 +-
 airflow/example_dags/example_trigger_target_dag.py |  11 +-
 airflow/example_dags/example_xcom.py               |  48 ++---
 airflow/example_dags/example_xcomargs.py           |  16 +-
 .../endpoints/test_task_instance_endpoint.py       |   4 +-
 13 files changed, 222 insertions(+), 244 deletions(-)

diff --git a/airflow/example_dags/example_branch_operator.py 
b/airflow/example_dags/example_branch_operator.py
index afc0340..69f939e 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -25,6 +25,7 @@ from airflow import DAG
 from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import BranchPythonOperator
 from airflow.utils.edgemodifier import Label
+from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
     dag_id='example_branch_operator',
@@ -47,7 +48,7 @@ with DAG(
 
     join = DummyOperator(
         task_id='join',
-        trigger_rule='none_failed_min_one_success',
+        trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
     )
 
     for option in options:
diff --git a/airflow/example_dags/example_complex.py 
b/airflow/example_dags/example_complex.py
index 542dee8..a141236 100644
--- a/airflow/example_dags/example_complex.py
+++ b/airflow/example_dags/example_complex.py
@@ -24,7 +24,6 @@ from datetime import datetime
 from airflow import models
 from airflow.models.baseoperator import chain
 from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
 
 with models.DAG(
     dag_id="example_complex",
@@ -131,7 +130,7 @@ with models.DAG(
     )
 
     # Search
-    search_catalog = PythonOperator(task_id="search_catalog", 
python_callable=lambda: print("search_catalog"))
+    search_catalog = BashOperator(task_id="search_catalog", bash_command="echo 
search_catalog")
 
     search_catalog_result = BashOperator(
         task_id="search_catalog_result", bash_command="echo 
search_catalog_result"
diff --git a/airflow/example_dags/example_dag_decorator.py 
b/airflow/example_dags/example_dag_decorator.py
index 2a74988..66b0fa4 100644
--- a/airflow/example_dags/example_dag_decorator.py
+++ b/airflow/example_dags/example_dag_decorator.py
@@ -26,7 +26,7 @@ from airflow.operators.email import EmailOperator
 
 
 class GetRequestOperator(BaseOperator):
-    """Custom operator to sand GET request to provided url"""
+    """Custom operator to send GET request to provided url"""
 
     def __init__(self, *, url: str, **kwargs):
         super().__init__(**kwargs)
diff --git a/airflow/example_dags/example_kubernetes_executor.py 
b/airflow/example_dags/example_kubernetes_executor.py
index ded6e97..b7ca2de 100644
--- a/airflow/example_dags/example_kubernetes_executor.py
+++ b/airflow/example_dags/example_kubernetes_executor.py
@@ -22,8 +22,8 @@ import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.operators.python import PythonOperator
 
 with DAG(
     dag_id='example_kubernetes_executor',
@@ -32,6 +32,33 @@ with DAG(
     catchup=False,
     tags=['example', 'example2'],
 ) as dag:
+    # You don't have to use any special KubernetesExecutor configuration if 
you don't want to
+    @task
+    def start_task():
+        print_stuff()
+
+    # But you can if you want to
+    kube_exec_config_special = {"KubernetesExecutor": {"image": 
"airflow/ci:latest"}}
+
+    @task(executor_config=kube_exec_config_special)
+    def one_task():
+        print_stuff()
+
+    # Use the zip binary, which is only found in this special docker image
+    kube_exec_config_zip_binary = {"KubernetesExecutor": {"image": 
"airflow/ci_zip:latest"}}
+
+    @task(task_id="two_task", executor_config=kube_exec_config_zip_binary)
+    def assert_zip_binary():
+        """
+        Checks whether Zip is installed.
+
+        :raises SystemError: if zip is not installed
+        """
+        return_code = os.system("zip")
+        if return_code != 0:
+            raise SystemError("The zip binary is not found")
+
+    # Limit resources on this operator/task with node affinity & tolerations
     affinity = {
         'podAntiAffinity': {
             'requiredDuringSchedulingIgnoredDuringExecution': [
@@ -47,52 +74,30 @@ with DAG(
 
     tolerations = [{'key': 'dedicated', 'operator': 'Equal', 'value': 
'airflow'}]
 
-    def assert_zip_binary():
-        """
-        Checks whether Zip is installed.
-
-        :raises SystemError: if zip is not installed
-        """
-        return_code = os.system("zip")
-        if return_code != 0:
-            raise SystemError("The zip binary is not found")
-
-    # You don't have to use any special KubernetesExecutor configuration if 
you don't want to
-    start_task = PythonOperator(task_id="start_task", 
python_callable=print_stuff)
+    kube_exec_config_resource_limits = {
+        "KubernetesExecutor": {
+            "request_memory": "128Mi",
+            "limit_memory": "128Mi",
+            "tolerations": tolerations,
+            "affinity": affinity,
+        }
+    }
 
-    # But you can if you want to
-    one_task = PythonOperator(
-        task_id="one_task",
-        python_callable=print_stuff,
-        executor_config={"KubernetesExecutor": {"image": "airflow/ci:latest"}},
-    )
+    @task(executor_config=kube_exec_config_resource_limits)
+    def three_task():
+        print_stuff()
 
-    # Use the zip binary, which is only found in this special docker image
-    two_task = PythonOperator(
-        task_id="two_task",
-        python_callable=assert_zip_binary,
-        executor_config={"KubernetesExecutor": {"image": 
"airflow/ci_zip:latest"}},
-    )
+    # Add arbitrary labels to worker pods
+    kube_exec_config_pod_labels = {"KubernetesExecutor": {"labels": {"foo": 
"bar"}}}
 
-    # Limit resources on this operator/task with node affinity & tolerations
-    three_task = PythonOperator(
-        task_id="three_task",
-        python_callable=print_stuff,
-        executor_config={
-            "KubernetesExecutor": {
-                "request_memory": "128Mi",
-                "limit_memory": "128Mi",
-                "tolerations": tolerations,
-                "affinity": affinity,
-            }
-        },
-    )
+    @task(executor_config=kube_exec_config_pod_labels)
+    def four_task():
+        print_stuff()
 
-    # Add arbitrary labels to worker pods
-    four_task = PythonOperator(
-        task_id="four_task",
-        python_callable=print_stuff,
-        executor_config={"KubernetesExecutor": {"labels": {"foo": "bar"}}},
-    )
+    start_task = start_task()
+    one_task = one_task()
+    two_task = assert_zip_binary()
+    three_task = three_task()
+    four_task = four_task()
 
     start_task >> [one_task, two_task, three_task, four_task]
diff --git a/airflow/example_dags/example_kubernetes_executor_config.py 
b/airflow/example_dags/example_kubernetes_executor_config.py
index 88fced8..87077c9 100644
--- a/airflow/example_dags/example_kubernetes_executor_config.py
+++ b/airflow/example_dags/example_kubernetes_executor_config.py
@@ -23,8 +23,8 @@ import os
 from datetime import datetime
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.example_dags.libs.helper import print_stuff
-from airflow.operators.python import PythonOperator
 from airflow.settings import AIRFLOW_HOME
 
 log = logging.getLogger(__name__)
@@ -39,20 +39,40 @@ try:
         catchup=False,
         tags=['example3'],
     ) as dag:
+        # You can use annotations on your kubernetes pods!
+        start_task_executor_config = {
+            "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
+        }
 
-        def test_sharedvolume_mount():
-            """
-            Tests whether the volume has been mounted.
-            """
-            for i in range(5):
-                try:
-                    return_code = os.system("cat /shared/test.txt")
-                    if return_code != 0:
-                        raise ValueError(f"Error when checking volume mount. 
Return code {return_code}")
-                except ValueError as e:
-                    if i > 4:
-                        raise e
+        @task(executor_config=start_task_executor_config)
+        def start_task():
+            print_stuff()
+
+        start_task = start_task()
+
+        # [START task_with_volume]
+        executor_config_volume_mount = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            volume_mounts=[
+                                k8s.V1VolumeMount(mount_path="/foo/", 
name="example-kubernetes-test-volume")
+                            ],
+                        )
+                    ],
+                    volumes=[
+                        k8s.V1Volume(
+                            name="example-kubernetes-test-volume",
+                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
+                        )
+                    ],
+                )
+            ),
+        }
 
+        @task(executor_config=executor_config_volume_mount)
         def test_volume_mount():
             """
             Tests whether the volume has been mounted.
@@ -64,104 +84,83 @@ try:
             if return_code != 0:
                 raise ValueError(f"Error when checking volume mount. Return 
code {return_code}")
 
-        # You can use annotations on your kubernetes pods!
-        start_task = PythonOperator(
-            task_id="start_task",
-            python_callable=print_stuff,
-            executor_config={
-                "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(annotations={"test": "annotation"}))
-            },
-        )
-
-        # [START task_with_volume]
-        volume_task = PythonOperator(
-            task_id="task_with_volume",
-            python_callable=test_volume_mount,
-            executor_config={
-                "pod_override": k8s.V1Pod(
-                    spec=k8s.V1PodSpec(
-                        containers=[
-                            k8s.V1Container(
-                                name="base",
-                                volume_mounts=[
-                                    k8s.V1VolumeMount(
-                                        mount_path="/foo/", 
name="example-kubernetes-test-volume"
-                                    )
-                                ],
-                            )
-                        ],
-                        volumes=[
-                            k8s.V1Volume(
-                                name="example-kubernetes-test-volume",
-                                
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
-                            )
-                        ],
-                    )
-                ),
-            },
-        )
+        volume_task = test_volume_mount()
         # [END task_with_volume]
 
         # [START task_with_template]
-        task_with_template = PythonOperator(
-            task_id="task_with_template",
-            python_callable=print_stuff,
-            executor_config={
-                "pod_template_file": os.path.join(AIRFLOW_HOME, 
"pod_templates/basic_template.yaml"),
-                "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
-            },
-        )
+        executor_config_template = {
+            "pod_template_file": os.path.join(AIRFLOW_HOME, 
"pod_templates/basic_template.yaml"),
+            "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
+        }
+
+        @task(executor_config=executor_config_template)
+        def task_with_template():
+            print_stuff()
+
+        task_with_template = task_with_template()
         # [END task_with_template]
 
         # [START task_with_sidecar]
-        sidecar_task = PythonOperator(
-            task_id="task_with_sidecar",
-            python_callable=test_sharedvolume_mount,
-            executor_config={
-                "pod_override": k8s.V1Pod(
-                    spec=k8s.V1PodSpec(
-                        containers=[
-                            k8s.V1Container(
-                                name="base",
-                                volume_mounts=[
-                                    k8s.V1VolumeMount(mount_path="/shared/", 
name="shared-empty-dir")
-                                ],
-                            ),
-                            k8s.V1Container(
-                                name="sidecar",
-                                image="ubuntu",
-                                args=["echo \"retrieved from mount\" > 
/shared/test.txt"],
-                                command=["bash", "-cx"],
-                                volume_mounts=[
-                                    k8s.V1VolumeMount(mount_path="/shared/", 
name="shared-empty-dir")
-                                ],
-                            ),
-                        ],
-                        volumes=[
-                            k8s.V1Volume(name="shared-empty-dir", 
empty_dir=k8s.V1EmptyDirVolumeSource()),
-                        ],
-                    )
-                ),
-            },
-        )
+        executor_config_sidecar = {
+            "pod_override": k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[
+                        k8s.V1Container(
+                            name="base",
+                            
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", 
name="shared-empty-dir")],
+                        ),
+                        k8s.V1Container(
+                            name="sidecar",
+                            image="ubuntu",
+                            args=["echo \"retrieved from mount\" > 
/shared/test.txt"],
+                            command=["bash", "-cx"],
+                            
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", 
name="shared-empty-dir")],
+                        ),
+                    ],
+                    volumes=[
+                        k8s.V1Volume(name="shared-empty-dir", 
empty_dir=k8s.V1EmptyDirVolumeSource()),
+                    ],
+                )
+            ),
+        }
+
+        @task(executor_config=executor_config_sidecar)
+        def test_sharedvolume_mount():
+            """
+            Tests whether the volume has been mounted.
+            """
+            for i in range(5):
+                try:
+                    return_code = os.system("cat /shared/test.txt")
+                    if return_code != 0:
+                        raise ValueError(f"Error when checking volume mount. 
Return code {return_code}")
+                except ValueError as e:
+                    if i > 4:
+                        raise e
+
+        sidecar_task = test_sharedvolume_mount()
         # [END task_with_sidecar]
 
         # Test that we can add labels to pods
-        third_task = PythonOperator(
-            task_id="non_root_task",
-            python_callable=print_stuff,
-            executor_config={
-                "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
-            },
-        )
-
-        other_ns_task = PythonOperator(
-            task_id="other_namespace_task",
-            python_callable=print_stuff,
-            executor_config={
-                "KubernetesExecutor": {"namespace": "test-namespace", 
"labels": {"release": "stable"}}
-            },
-        )
+        executor_config_non_root = {
+            "pod_override": 
k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"}))
+        }
+
+        @task(executor_config=executor_config_non_root)
+        def non_root_task():
+            print_stuff()
+
+        third_task = non_root_task()
+
+        executor_config_other_ns = {
+            "KubernetesExecutor": {"namespace": "test-namespace", "labels": 
{"release": "stable"}}
+        }
+
+        @task(executor_config=executor_config_other_ns)
+        def other_namespace_task():
+            print_stuff()
+
+        other_ns_task = other_namespace_task()
 
         start_task >> volume_task >> third_task
         start_task >> other_ns_task
diff --git a/airflow/example_dags/example_nested_branch_dag.py 
b/airflow/example_dags/example_nested_branch_dag.py
index cd036b0..add81a9 100644
--- a/airflow/example_dags/example_nested_branch_dag.py
+++ b/airflow/example_dags/example_nested_branch_dag.py
@@ -26,6 +26,7 @@ from datetime import datetime
 from airflow.models import DAG
 from airflow.operators.dummy import DummyOperator
 from airflow.operators.python import BranchPythonOperator
+from airflow.utils.trigger_rule import TriggerRule
 
 with DAG(
     dag_id="example_nested_branch_dag",
@@ -35,11 +36,11 @@ with DAG(
     tags=["example"],
 ) as dag:
     branch_1 = BranchPythonOperator(task_id="branch_1", 
python_callable=lambda: "true_1")
-    join_1 = DummyOperator(task_id="join_1", 
trigger_rule="none_failed_min_one_success")
+    join_1 = DummyOperator(task_id="join_1", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
     true_1 = DummyOperator(task_id="true_1")
     false_1 = DummyOperator(task_id="false_1")
     branch_2 = BranchPythonOperator(task_id="branch_2", 
python_callable=lambda: "true_2")
-    join_2 = DummyOperator(task_id="join_2", 
trigger_rule="none_failed_min_one_success")
+    join_2 = DummyOperator(task_id="join_2", 
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
     true_2 = DummyOperator(task_id="true_2")
     false_2 = DummyOperator(task_id="false_2")
     false_3 = DummyOperator(task_id="false_3")
diff --git a/airflow/example_dags/example_passing_params_via_test_command.py 
b/airflow/example_dags/example_passing_params_via_test_command.py
index 41930d2..e3f04c4 100644
--- a/airflow/example_dags/example_passing_params_via_test_command.py
+++ b/airflow/example_dags/example_passing_params_via_test_command.py
@@ -23,11 +23,12 @@ from datetime import datetime, timedelta
 from textwrap import dedent
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
 
 
-def my_py_command(test_mode, params):
+@task(task_id="run_this")
+def my_py_command(params, test_mode=None, task=None):
     """
     Print out the "foo" param passed in via
     `airflow tasks test example_passing_params_via_test_command run_this <date>
@@ -37,7 +38,7 @@ def my_py_command(test_mode, params):
         print(
             " 'foo' was passed in via test={} command : kwargs[params][foo] \
                = {}".format(
-                test_mode, params["foo"]
+                test_mode, task.params["foo"]
             )
         )
     # Print out the value of "miff", passed in below via the Python Operator
@@ -45,7 +46,8 @@ def my_py_command(test_mode, params):
     return 1
 
 
-def print_env_vars(test_mode):
+@task(task_id="env_var_test_task")
+def print_env_vars(test_mode=None):
     """
     Print out the "foo" param passed in via
     `airflow tasks test example_passing_params_via_test_command 
env_var_test_task <date>
@@ -64,6 +66,8 @@ with DAG(
     dagrun_timeout=timedelta(minutes=4),
     tags=['example'],
 ) as dag:
+    run_this = my_py_command(params={"miff": "agg"})
+
     my_templated_command = dedent(
         """
         echo " 'foo was passed in via Airflow CLI Test command with value {{ 
params.foo }} "
@@ -71,18 +75,12 @@ with DAG(
     """
     )
 
-    run_this = PythonOperator(
-        task_id='run_this',
-        python_callable=my_py_command,
-        params={"miff": "agg"},
-    )
-
     also_run_this = BashOperator(
         task_id='also_run_this',
         bash_command=my_templated_command,
         params={"miff": "agg"},
     )
 
-    env_var_test_task = PythonOperator(task_id='env_var_test_task', 
python_callable=print_env_vars)
+    env_var_test_task = print_env_vars()
 
     run_this >> also_run_this
diff --git a/airflow/example_dags/example_python_operator.py 
b/airflow/example_dags/example_python_operator.py
index 50ca185..8d5ce59 100644
--- a/airflow/example_dags/example_python_operator.py
+++ b/airflow/example_dags/example_python_operator.py
@@ -16,13 +16,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-"""Example DAG demonstrating the usage of the PythonOperator."""
+"""
+Example DAG demonstrating the usage of the TaskFlow API to execute Python 
functions natively and within a
+virtual environment.
+"""
 import time
 from datetime import datetime
 from pprint import pprint
 
 from airflow import DAG
-from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
+from airflow.decorators import task
 
 with DAG(
     dag_id='example_python_operator',
@@ -32,35 +35,34 @@ with DAG(
     tags=['example'],
 ) as dag:
     # [START howto_operator_python]
-    def print_context(ds, **kwargs):
+    @task(task_id="print_the_context")
+    def print_context(ds=None, **kwargs):
         """Print the Airflow context and ds variable from the context."""
         pprint(kwargs)
         print(ds)
         return 'Whatever you return gets printed in the logs'
 
-    run_this = PythonOperator(
-        task_id='print_the_context',
-        python_callable=print_context,
-    )
+    run_this = print_context()
     # [END howto_operator_python]
 
     # [START howto_operator_python_kwargs]
-    def my_sleeping_function(random_base):
-        """This is a function that will run within the DAG execution"""
-        time.sleep(random_base)
-
     # Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
     for i in range(5):
-        task = PythonOperator(
-            task_id='sleep_for_' + str(i),
-            python_callable=my_sleeping_function,
-            op_kwargs={'random_base': float(i) / 10},
-        )
 
-        run_this >> task
+        @task(task_id=f'sleep_for_{i}')
+        def my_sleeping_function(random_base):
+            """This is a function that will run within the DAG execution"""
+            time.sleep(random_base)
+
+        sleeping_task = my_sleeping_function(random_base=float(i) / 10)
+
+        run_this >> sleeping_task
     # [END howto_operator_python_kwargs]
 
     # [START howto_operator_python_venv]
+    @task.virtualenv(
+        task_id="virtualenv_python", requirements=["colorama==0.4.0"], 
system_site_packages=False
+    )
     def callable_virtualenv():
         """
         Example function that will be performed in a virtual environment.
@@ -81,10 +83,5 @@ with DAG(
             sleep(10)
         print('Finished')
 
-    virtualenv_task = PythonVirtualenvOperator(
-        task_id="virtualenv_python",
-        python_callable=callable_virtualenv,
-        requirements=["colorama==0.4.0"],
-        system_site_packages=False,
-    )
+    virtualenv_task = callable_virtualenv()
     # [END howto_operator_python_venv]
diff --git a/airflow/example_dags/example_skip_dag.py 
b/airflow/example_dags/example_skip_dag.py
index ab8c8df..cb664e7 100644
--- a/airflow/example_dags/example_skip_dag.py
+++ b/airflow/example_dags/example_skip_dag.py
@@ -23,6 +23,7 @@ from datetime import datetime
 from airflow import DAG
 from airflow.exceptions import AirflowSkipException
 from airflow.operators.dummy import DummyOperator
+from airflow.utils.trigger_rule import TriggerRule
 
 
 # Create some placeholder operators
@@ -35,7 +36,7 @@ class DummySkipOperator(DummyOperator):
         raise AirflowSkipException
 
 
-def create_test_pipeline(suffix, trigger_rule, dag_):
+def create_test_pipeline(suffix, trigger_rule):
     """
     Instantiate a number of operators for the given DAG.
 
@@ -43,10 +44,10 @@ def create_test_pipeline(suffix, trigger_rule, dag_):
     :param str trigger_rule: TriggerRule for the join task
     :param DAG dag_: The DAG to run the operators on
     """
-    skip_operator = DummySkipOperator(task_id=f'skip_operator_{suffix}', 
dag=dag_)
-    always_true = DummyOperator(task_id=f'always_true_{suffix}', dag=dag_)
-    join = DummyOperator(task_id=trigger_rule, dag=dag_, 
trigger_rule=trigger_rule)
-    final = DummyOperator(task_id=f'final_{suffix}', dag=dag_)
+    skip_operator = DummySkipOperator(task_id=f'skip_operator_{suffix}')
+    always_true = DummyOperator(task_id=f'always_true_{suffix}')
+    join = DummyOperator(task_id=trigger_rule, trigger_rule=trigger_rule)
+    final = DummyOperator(task_id=f'final_{suffix}')
 
     skip_operator >> join
     always_true >> join
@@ -54,5 +55,5 @@ def create_test_pipeline(suffix, trigger_rule, dag_):
 
 
 with DAG(dag_id='example_skip_dag', start_date=datetime(2021, 1, 1), 
catchup=False, tags=['example']) as dag:
-    create_test_pipeline('1', 'all_success', dag)
-    create_test_pipeline('2', 'one_success', dag)
+    create_test_pipeline('1', TriggerRule.ALL_SUCCESS)
+    create_test_pipeline('2', TriggerRule.ONE_SUCCESS)
diff --git a/airflow/example_dags/example_trigger_target_dag.py 
b/airflow/example_dags/example_trigger_target_dag.py
index 32e06a3..41aecf1 100644
--- a/airflow/example_dags/example_trigger_target_dag.py
+++ b/airflow/example_dags/example_trigger_target_dag.py
@@ -24,18 +24,19 @@ Example usage of the TriggerDagRunOperator. This example 
holds 2 DAGs:
 from datetime import datetime
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
 
 
-def run_this_func(dag_run):
+@task(task_id="run_this")
+def run_this_func(dag_run=None):
     """
     Print the payload "message" passed to the DagRun conf attribute.
 
     :param dag_run: The DagRun object
     :type dag_run: DagRun
     """
-    print(f"Remotely received value of {dag_run.conf['message']} for 
key=message")
+    print(f"Remotely received value of {dag_run.conf.get('message')} for 
key=message")
 
 
 with DAG(
@@ -45,10 +46,10 @@ with DAG(
     schedule_interval=None,
     tags=['example'],
 ) as dag:
-    run_this = PythonOperator(task_id="run_this", 
python_callable=run_this_func)
+    run_this = run_this_func()
 
     bash_task = BashOperator(
         task_id="bash_task",
         bash_command='echo "Here is the message: $message"',
-        env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},
+        env={'message': '{{ dag_run.conf.get("message") }}'},
     )
diff --git a/airflow/example_dags/example_xcom.py 
b/airflow/example_dags/example_xcom.py
index a8c5d99..405d5c5 100644
--- a/airflow/example_dags/example_xcom.py
+++ b/airflow/example_dags/example_xcom.py
@@ -20,19 +20,21 @@
 from datetime import datetime
 
 from airflow import DAG
+from airflow.decorators import task
 from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator
 
 value_1 = [1, 2, 3]
 value_2 = {'a': 'b'}
 
 
-def push(**kwargs):
+@task
+def push(ti=None):
     """Pushes an XCom without a specific target"""
-    kwargs['ti'].xcom_push(key='value from pusher 1', value=value_1)
+    ti.xcom_push(key='value from pusher 1', value=value_1)
 
 
-def push_by_returning(**kwargs):
+@task
+def push_by_returning():
     """Pushes an XCom without a specific target, just by returning it"""
     return value_2
 
@@ -42,16 +44,17 @@ def _compare_values(pulled_value, check_value):
         raise ValueError(f'The two values differ {pulled_value} and 
{check_value}')
 
 
-def puller(pulled_value_1, pulled_value_2, **kwargs):
+@task
+def puller(pulled_value_2, ti=None):
     """Pull all previously pushed XComs and check if the pushed values match 
the pulled values."""
+    pulled_value_1 = ti.xcom_pull(task_ids="push", key="value from pusher 1")
 
-    # Check pulled values from function args
     _compare_values(pulled_value_1, value_1)
     _compare_values(pulled_value_2, value_2)
 
 
-def pull_value_from_bash_push(**kwargs):
-    ti = kwargs['ti']
+@task
+def pull_value_from_bash_push(ti=None):
     bash_pushed_via_return_value = ti.xcom_pull(key="return_value", 
task_ids='bash_push')
     bash_manually_pushed_value = ti.xcom_pull(key="manually_pushed_value", 
task_ids='bash_push')
     print(f"The xcom value pushed by task push via return value is 
{bash_pushed_via_return_value}")
@@ -65,25 +68,6 @@ with DAG(
     catchup=False,
     tags=['example'],
 ) as dag:
-    push1 = PythonOperator(
-        task_id='push',
-        python_callable=push,
-    )
-
-    push2 = PythonOperator(
-        task_id='push_by_returning',
-        python_callable=push_by_returning,
-    )
-
-    pull = PythonOperator(
-        task_id='puller',
-        python_callable=puller,
-        op_kwargs={
-            'pulled_value_1': push1.output['value from pusher 1'],
-            'pulled_value_2': push2.output,
-        },
-    )
-
     bash_push = BashOperator(
         task_id='bash_push',
         bash_command='echo "bash_push demo"  && '
@@ -101,13 +85,11 @@ with DAG(
         do_xcom_push=False,
     )
 
-    python_pull_from_bash = PythonOperator(
-        task_id='python_pull_from_bash',
-        python_callable=pull_value_from_bash_push,
-    )
+    python_pull_from_bash = pull_value_from_bash_push()
 
     [bash_pull, python_pull_from_bash] << bash_push
 
+    puller(push_by_returning()) << push()
+
     # Task dependencies created via `XComArgs`:
-    #   push1 >> pull
-    #   push2 >> pull
+    #   pull << push2
diff --git a/airflow/example_dags/example_xcomargs.py 
b/airflow/example_dags/example_xcomargs.py
index 382e9af..7e0cdd9 100644
--- a/airflow/example_dags/example_xcomargs.py
+++ b/airflow/example_dags/example_xcomargs.py
@@ -23,21 +23,20 @@ from datetime import datetime
 from airflow import DAG
 from airflow.decorators import task
 from airflow.operators.bash import BashOperator
-from airflow.operators.python import PythonOperator, get_current_context
 
 log = logging.getLogger(__name__)
 
 
+@task
 def generate_value():
     """Dummy function"""
     return "Bring me a shrubbery!"
 
 
-@task()
-def print_value(value):
+@task
+def print_value(value, ts=None):
     """Dummy function"""
-    ctx = get_current_context()
-    log.info("The knights of Ni say: %s (at %s)", value, ctx['ts'])
+    log.info("The knights of Ni say: %s (at %s)", value, ts)
 
 
 with DAG(
@@ -47,12 +46,7 @@ with DAG(
     schedule_interval=None,
     tags=['example'],
 ) as dag:
-    task1 = PythonOperator(
-        task_id='generate_value',
-        python_callable=generate_value,
-    )
-
-    print_value(task1.output)
+    print_value(generate_value())
 
 with DAG(
     "example_xcom_args_with_operators",
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 719ae35..315f66a 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -214,7 +214,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "executor_config": "{}",
             "hostname": "",
             "max_tries": 0,
-            "operator": "PythonOperator",
+            "operator": "_PythonDecoratedOperator",
             "pid": 100,
             "pool": "default_pool",
             "pool_slots": 1,
@@ -254,7 +254,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint):
             "executor_config": "{}",
             "hostname": "",
             "max_tries": 0,
-            "operator": "PythonOperator",
+            "operator": "_PythonDecoratedOperator",
             "pid": 100,
             "pool": "default_pool",
             "pool_slots": 1,

Reply via email to