This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 44b4a3755d Add deferrable mode to `BeamRunPythonPipelineOperator` 
(#31471)
44b4a3755d is described below

commit 44b4a3755deea23f8ff0eb1316db880b1d64812c
Author: VladaZakharova <80038284+vladazakhar...@users.noreply.github.com>
AuthorDate: Thu Jul 13 18:37:11 2023 +0200

    Add deferrable mode to `BeamRunPythonPipelineOperator` (#31471)
    
    * Add deferrable mode to BeamRunPythonPipelineOperator
    
    * Fix docs
    
    * Fix deferrable parameter in init
    
    * fix system test
---
 airflow/providers/apache/beam/hooks/beam.py        | 190 ++++++++++++++++++++-
 airflow/providers/apache/beam/operators/beam.py    | 113 +++++++++---
 airflow/providers/apache/beam/provider.yaml        |   5 +
 airflow/providers/apache/beam/triggers/__init__.py |  16 ++
 airflow/providers/apache/beam/triggers/beam.py     | 116 +++++++++++++
 .../operators.rst                                  |  29 ++++
 tests/providers/apache/beam/hooks/test_beam.py     | 177 ++++++++++++++++++-
 tests/providers/apache/beam/operators/test_beam.py | 155 +++++++++++++++--
 tests/providers/apache/beam/triggers/__init__.py   |  16 ++
 tests/providers/apache/beam/triggers/test_beam.py  | 107 ++++++++++++
 .../system/providers/apache/beam/example_python.py |  11 +-
 .../{example_python.py => example_python_async.py} |  34 ++--
 12 files changed, 911 insertions(+), 58 deletions(-)

diff --git a/airflow/providers/apache/beam/hooks/beam.py 
b/airflow/providers/apache/beam/hooks/beam.py
index 264d9076e0..d92ade2050 100644
--- a/airflow/providers/apache/beam/hooks/beam.py
+++ b/airflow/providers/apache/beam/hooks/beam.py
@@ -18,8 +18,10 @@
 """This module contains a Apache Beam Hook."""
 from __future__ import annotations
 
+import asyncio
 import contextlib
 import copy
+import functools
 import json
 import logging
 import os
@@ -27,8 +29,8 @@ import select
 import shlex
 import shutil
 import subprocess
+import tempfile
 import textwrap
-from tempfile import TemporaryDirectory
 from typing import Callable
 
 from packaging.version import Version
@@ -222,7 +224,7 @@ class BeamHook(BaseHook):
             If a value is passed to this parameter, a new virtual environment 
has been created with
             additional packages installed.
 
-            You could also install the apache-beam package if it is not 
installed on your system or you want
+            You could also install the apache-beam package if it is not 
installed on your system, or you want
             to use a different version.
         :param py_system_site_packages: Whether to include 
system_site_packages in your virtualenv.
             See virtualenv documentation for more information.
@@ -251,7 +253,7 @@ class BeamHook(BaseHook):
                         """
                     )
                     raise AirflowException(warning_invalid_environment)
-                tmp_dir = 
exit_stack.enter_context(TemporaryDirectory(prefix="apache-beam-venv"))
+                tmp_dir = 
exit_stack.enter_context(tempfile.TemporaryDirectory(prefix="apache-beam-venv"))
                 py_interpreter = prepare_virtualenv(
                     venv_directory=tmp_dir,
                     python_bin=py_interpreter,
@@ -381,3 +383,185 @@ class BeamHook(BaseHook):
             command_prefix=command_prefix,
             process_line_callback=process_line_callback,
         )
+
+
+class BeamAsyncHook(BeamHook):
+    """
+    Asynchronous hook for Apache Beam.
+    :param runner: Runner type.
+    """
+
+    def __init__(
+        self,
+        runner: str,
+    ) -> None:
+        self.runner = runner
+        super().__init__(runner=self.runner)
+
+    @staticmethod
+    async def _create_tmp_dir(prefix: str) -> str:
+        """Helper method to create temporary directory."""
+        # Creating separate thread to create temporary directory
+        loop = asyncio.get_running_loop()
+        partial_func = functools.partial(tempfile.mkdtemp, prefix=prefix)
+        tmp_dir = await loop.run_in_executor(None, partial_func)
+        return tmp_dir
+
+    @staticmethod
+    async def _cleanup_tmp_dir(tmp_dir: str) -> None:
+        """
+        Helper method to delete temporary directory after finishing work with 
it.
+        Is uses `rmtree` method to recursively remove the temporary directory.
+        """
+        shutil.rmtree(tmp_dir)
+
+    async def start_python_pipeline_async(
+        self,
+        variables: dict,
+        py_file: str,
+        py_options: list[str] | None = None,
+        py_interpreter: str = "python3",
+        py_requirements: list[str] | None = None,
+        py_system_site_packages: bool = False,
+    ):
+        """
+        Starts Apache Beam python pipeline.
+
+        :param variables: Variables passed to the pipeline.
+        :param py_file: Path to the python file to execute.
+        :param py_options: Additional options.
+        :param py_interpreter: Python version of the Apache Beam pipeline.
+            If None, this defaults to the python3.
+            To track python versions supported by beam and related
+            issues check: https://issues.apache.org/jira/browse/BEAM-1251
+        :param py_requirements: Additional python package(s) to install.
+            If a value is passed to this parameter, a new virtual environment 
has been created with
+            additional packages installed.
+            You could also install the apache-beam package if it is not 
installed on your system, or you want
+            to use a different version.
+        :param py_system_site_packages: Whether to include 
system_site_packages in your virtualenv.
+            See virtualenv documentation for more information.
+            This option is only relevant if the ``py_requirements`` parameter 
is not None.
+        """
+        if "labels" in variables:
+            variables["labels"] = [f"{key}={value}" for key, value in 
variables["labels"].items()]
+
+        # Creating temporary directory
+        tmp_dir = await self._create_tmp_dir(prefix="apache-beam-venv")
+
+        async with contextlib.AsyncExitStack() as exit_stack:
+            if py_requirements is not None:
+                if not py_requirements and not py_system_site_packages:
+                    warning_invalid_environment = textwrap.dedent(
+                        """\
+                        Invalid method invocation. You have disabled inclusion 
of system packages and empty
+                        list required for installation, so it is not possible 
to create a valid virtual
+                        environment. In the virtual environment, apache-beam 
package must be installed for
+                        your job to be executed.
+
+                        To fix this problem:
+                        * install apache-beam on the system, then set 
parameter py_system_site_packages
+                          to True,
+                        * add apache-beam to the list of required packages in 
parameter py_requirements.
+                        """
+                    )
+                    raise AirflowException(warning_invalid_environment)
+
+                # Pushing asynchronous callback to ensure the cleanup of the 
temporary
+                # directory when the asynchronous context is exited
+                exit_stack.push_async_callback(self._cleanup_tmp_dir, tmp_dir)
+
+                py_interpreter = prepare_virtualenv(
+                    venv_directory=tmp_dir,
+                    python_bin=py_interpreter,
+                    system_site_packages=py_system_site_packages,
+                    requirements=py_requirements,
+                )
+            command_prefix: list[str] = [py_interpreter] + (py_options or []) 
+ [py_file]
+            beam_version = (
+                subprocess.check_output(
+                    [py_interpreter, "-c", "import apache_beam; 
print(apache_beam.__version__)"]
+                )
+                .decode()
+                .strip()
+            )
+            self.log.info("Beam version: %s", beam_version)
+            impersonate_service_account = 
variables.get("impersonate_service_account")
+            if impersonate_service_account:
+                if Version(beam_version) < Version("2.39.0") or True:
+                    raise AirflowException(
+                        "The impersonateServiceAccount option requires Apache 
Beam 2.39.0 or newer."
+                    )
+            return_code = await self.start_pipeline_async(
+                variables=variables,
+                command_prefix=command_prefix,
+            )
+            return return_code
+
+    async def start_pipeline_async(
+        self,
+        variables: dict,
+        command_prefix: list[str],
+        working_directory: str | None = None,
+    ) -> int:
+        cmd = command_prefix + [
+            f"--runner={self.runner}",
+        ]
+        if variables:
+            cmd.extend(beam_options_to_args(variables))
+        return await self.run_beam_command_async(
+            cmd=cmd,
+            working_directory=working_directory,
+            log=self.log,
+        )
+
+    async def run_beam_command_async(
+        self,
+        cmd: list[str],
+        log: logging.Logger,
+        working_directory: str | None = None,
+    ) -> int:
+        """
+        Function responsible for running pipeline command in subprocess.
+
+        :param cmd: Parts of the command to be run in subprocess
+        :param working_directory: Working directory
+        :param log: logger.
+        """
+        cmd_str_representation = " ".join(shlex.quote(c) for c in cmd)
+        log.info("Running command: %s", cmd_str_representation)
+
+        # Creating a separate asynchronous process
+        process = await asyncio.create_subprocess_shell(
+            cmd_str_representation,
+            shell=True,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            close_fds=True,
+            cwd=working_directory,
+        )
+        # Waits for Apache Beam pipeline to complete.
+        log.info("Start waiting for Apache Beam process to complete.")
+
+        # Creating separate threads for stdout and stderr
+        stdout_task = asyncio.create_task(self.read_logs(process.stdout))
+        stderr_task = asyncio.create_task(self.read_logs(process.stderr))
+
+        # Waiting for the both tasks to complete
+        await asyncio.gather(stdout_task, stderr_task)
+
+        # Wait for the process to complete and return return_code
+        return_code = await process.wait()
+        log.info("Process exited with return code: %s", return_code)
+
+        if return_code != 0:
+            raise AirflowException(f"Apache Beam process failed with return 
code {return_code}")
+        return return_code
+
+    async def read_logs(self, stream_reader):
+        while True:
+            line = await stream_reader.readline()
+            if not line:
+                break
+            decoded_line = line.decode().strip()
+            self.log.info(decoded_line)
diff --git a/airflow/providers/apache/beam/operators/beam.py 
b/airflow/providers/apache/beam/operators/beam.py
index 68e6972547..cc7d217abb 100644
--- a/airflow/providers/apache/beam/operators/beam.py
+++ b/airflow/providers/apache/beam/operators/beam.py
@@ -18,6 +18,8 @@
 """This module contains Apache Beam operators."""
 from __future__ import annotations
 
+import asyncio
+import contextlib
 import copy
 import os
 import stat
@@ -26,11 +28,13 @@ from abc import ABC, ABCMeta, abstractmethod
 from concurrent.futures import ThreadPoolExecutor, as_completed
 from contextlib import ExitStack
 from functools import partial
-from typing import TYPE_CHECKING, Callable, Sequence
+from typing import IO, TYPE_CHECKING, Any, Callable, Sequence
 
 from airflow import AirflowException
+from airflow.configuration import conf
 from airflow.models import BaseOperator
 from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType
+from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger
 from airflow.providers.google.cloud.hooks.dataflow import (
     DataflowHook,
     process_line_and_extract_dataflow_job_id_callback,
@@ -144,7 +148,7 @@ class BeamBasePipelineOperator(BaseOperator, 
BeamDataflowMixin, ABC):
         When defining labels (labels option), you can also provide a 
dictionary.
     :param gcp_conn_id: Optional.
         The connection ID to use connecting to Google Cloud Storage if python 
file is on GCS.
-    :param dataflow_config: Dataflow configuration, used when runner type is 
set to DataflowRunner,
+    :param dataflow_config: Dataflow's configuration, used when runner type is 
set to DataflowRunner,
         (optional) defaults to None.
     """
 
@@ -167,7 +171,7 @@ class BeamBasePipelineOperator(BaseOperator, 
BeamDataflowMixin, ABC):
             self.dataflow_config = DataflowConfiguration(**dataflow_config)
         else:
             self.dataflow_config = dataflow_config or DataflowConfiguration()
-        self.beam_hook: BeamHook | None = None
+        self.beam_hook: BeamHook
         self.dataflow_hook: DataflowHook | None = None
         self.dataflow_job_id: str | None = None
 
@@ -237,8 +241,8 @@ class 
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
         to use a different version.
     :param py_system_site_packages: Whether to include system_site_packages in 
your virtualenv.
         See virtualenv documentation for more information.
-
         This option is only relevant if the ``py_requirements`` parameter is 
not None.
+    :param deferrable: Run operator in the deferrable mode: checks for the 
state using asynchronous calls.
     """
 
     template_fields: Sequence[str] = (
@@ -264,6 +268,7 @@ class 
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
         py_system_site_packages: bool = False,
         gcp_conn_id: str = "google_cloud_default",
         dataflow_config: DataflowConfiguration | dict | None = None,
+        deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         **kwargs,
     ) -> None:
         super().__init__(
@@ -283,35 +288,42 @@ class 
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
         self.pipeline_options.setdefault("labels", {}).update(
             {"airflow-version": "v" + version.replace(".", "-").replace("+", 
"-")}
         )
+        self.deferrable = deferrable
 
     def execute(self, context: Context):
         """Execute the Apache Beam Pipeline."""
         (
-            is_dataflow,
-            dataflow_job_name,
-            snake_case_pipeline_options,
-            process_line_callback,
+            self.is_dataflow,
+            self.dataflow_job_name,
+            self.snake_case_pipeline_options,
+            self.process_line_callback,
         ) = self._init_pipeline_options(format_pipeline_options=True, 
job_name_variable_key="job_name")
-
         if not self.beam_hook:
             raise AirflowException("Beam hook is not defined.")
+        # Check deferrable parameter passed to the operator
+        # to determine type of run - asynchronous or synchronous
+        if self.deferrable:
+            asyncio.run(self.execute_async(context))
+        else:
+            return self.execute_sync(context)
 
+    def execute_sync(self, context: Context):
         with ExitStack() as exit_stack:
             if self.py_file.lower().startswith("gs://"):
                 gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
                 tmp_gcs_file = 
exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file))
                 self.py_file = tmp_gcs_file.name
 
-            if is_dataflow and self.dataflow_hook:
+            if self.is_dataflow and self.dataflow_hook:
                 with self.dataflow_hook.provide_authorized_gcloud():
                     self.beam_hook.start_python_pipeline(
-                        variables=snake_case_pipeline_options,
+                        variables=self.snake_case_pipeline_options,
                         py_file=self.py_file,
                         py_options=self.py_options,
                         py_interpreter=self.py_interpreter,
                         py_requirements=self.py_requirements,
                         py_system_site_packages=self.py_system_site_packages,
-                        process_line_callback=process_line_callback,
+                        process_line_callback=self.process_line_callback,
                     )
                 DataflowJobLink.persist(
                     self,
@@ -320,25 +332,82 @@ class 
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
                     self.dataflow_config.location,
                     self.dataflow_job_id,
                 )
-                if dataflow_job_name and self.dataflow_config.location:
-                    self.dataflow_hook.wait_for_done(
-                        job_name=dataflow_job_name,
-                        location=self.dataflow_config.location,
-                        job_id=self.dataflow_job_id,
-                        multiple_jobs=False,
-                        project_id=self.dataflow_config.project_id,
-                    )
                 return {"dataflow_job_id": self.dataflow_job_id}
             else:
                 self.beam_hook.start_python_pipeline(
-                    variables=snake_case_pipeline_options,
+                    variables=self.snake_case_pipeline_options,
                     py_file=self.py_file,
                     py_options=self.py_options,
                     py_interpreter=self.py_interpreter,
                     py_requirements=self.py_requirements,
                     py_system_site_packages=self.py_system_site_packages,
-                    process_line_callback=process_line_callback,
+                    process_line_callback=self.process_line_callback,
+                )
+
+    async def execute_async(self, context: Context):
+        # Creating a new event loop to manage I/O operations asynchronously
+        loop = asyncio.get_event_loop()
+        if self.py_file.lower().startswith("gs://"):
+            gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id)
+            # Running synchronous `enter_context()` method in a separate
+            # thread using the default executor `None`. The 
`run_in_executor()` function returns the
+            # file object, which is created using gcs function 
`provide_file()`, asynchronously.
+            # This means we can perform asynchronous operations with this file.
+            create_tmp_file_call = 
gcs_hook.provide_file(object_url=self.py_file)
+            tmp_gcs_file: IO[str] = await loop.run_in_executor(
+                None, contextlib.ExitStack().enter_context, 
create_tmp_file_call
+            )
+            self.py_file = tmp_gcs_file.name
+
+        if self.is_dataflow and self.dataflow_hook:
+            DataflowJobLink.persist(
+                self,
+                context,
+                self.dataflow_config.project_id,
+                self.dataflow_config.location,
+                self.dataflow_job_id,
+            )
+            with self.dataflow_hook.provide_authorized_gcloud():
+                self.defer(
+                    trigger=BeamPipelineTrigger(
+                        variables=self.snake_case_pipeline_options,
+                        py_file=self.py_file,
+                        py_options=self.py_options,
+                        py_interpreter=self.py_interpreter,
+                        py_requirements=self.py_requirements,
+                        py_system_site_packages=self.py_system_site_packages,
+                        runner=self.runner,
+                    ),
+                    method_name="execute_complete",
                 )
+        else:
+            self.defer(
+                trigger=BeamPipelineTrigger(
+                    variables=self.snake_case_pipeline_options,
+                    py_file=self.py_file,
+                    py_options=self.py_options,
+                    py_interpreter=self.py_interpreter,
+                    py_requirements=self.py_requirements,
+                    py_system_site_packages=self.py_system_site_packages,
+                    runner=self.runner,
+                ),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context: Context, event: dict[str, Any]):
+        """
+        Callback for when the trigger fires - returns immediately.
+        Relies on trigger to throw an exception, otherwise it assumes 
execution was
+        successful.
+        """
+        if event["status"] == "error":
+            raise AirflowException(event["message"])
+        self.log.info(
+            "%s completed with response %s ",
+            self.task_id,
+            event["message"],
+        )
+        return {"dataflow_job_id": self.dataflow_job_id}
 
     def on_kill(self) -> None:
         if self.dataflow_hook and self.dataflow_job_id:
diff --git a/airflow/providers/apache/beam/provider.yaml 
b/airflow/providers/apache/beam/provider.yaml
index d177e251d7..dbc59a95af 100644
--- a/airflow/providers/apache/beam/provider.yaml
+++ b/airflow/providers/apache/beam/provider.yaml
@@ -63,6 +63,11 @@ hooks:
     python-modules:
       - airflow.providers.apache.beam.hooks.beam
 
+triggers:
+  - integration-name: Apache Beam
+    python-modules:
+      - airflow.providers.apache.beam.triggers.beam
+
 additional-extras:
   - name: google
     dependencies:
diff --git a/airflow/providers/apache/beam/triggers/__init__.py 
b/airflow/providers/apache/beam/triggers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/airflow/providers/apache/beam/triggers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/airflow/providers/apache/beam/triggers/beam.py 
b/airflow/providers/apache/beam/triggers/beam.py
new file mode 100644
index 0000000000..2926a17b45
--- /dev/null
+++ b/airflow/providers/apache/beam/triggers/beam.py
@@ -0,0 +1,116 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Any, AsyncIterator
+
+from airflow.providers.apache.beam.hooks.beam import BeamAsyncHook
+from airflow.triggers.base import BaseTrigger, TriggerEvent
+
+
+class BeamPipelineTrigger(BaseTrigger):
+    """
+    Trigger to perform checking the pipeline status until it reaches terminate 
state.
+
+    :param variables: Variables passed to the pipeline.
+    :param py_file: Path to the python file to execute.
+    :param py_options: Additional options.
+    :param py_interpreter: Python version of the Apache Beam pipeline. If 
`None`, this defaults to the
+        python3. To track python versions supported by beam and related issues
+        check: https://issues.apache.org/jira/browse/BEAM-1251
+    :param py_requirements: Additional python package(s) to install.
+        If a value is passed to this parameter, a new virtual environment has 
been created with
+        additional packages installed.
+
+        You could also install the apache-beam package if it is not installed 
on your system, or you want
+        to use a different version.
+    :param py_system_site_packages: Whether to include system_site_packages in 
your virtualenv.
+        See virtualenv documentation for more information.
+
+        This option is only relevant if the ``py_requirements`` parameter is 
not None.
+    :param runner: Runner on which pipeline will be run. By default, 
"DirectRunner" is being used.
+        Other possible options: DataflowRunner, SparkRunner, FlinkRunner, 
PortableRunner.
+        See: :class:`~providers.apache.beam.hooks.beam.BeamRunnerType`
+        See: https://beam.apache.org/documentation/runners/capability-matrix/
+    """
+
+    def __init__(
+        self,
+        variables: dict,
+        py_file: str,
+        py_options: list[str] | None = None,
+        py_interpreter: str = "python3",
+        py_requirements: list[str] | None = None,
+        py_system_site_packages: bool = False,
+        runner: str = "DirectRunner",
+    ):
+        super().__init__()
+        self.variables = variables
+        self.py_file = py_file
+        self.py_options = py_options
+        self.py_interpreter = py_interpreter
+        self.py_requirements = py_requirements
+        self.py_system_site_packages = py_system_site_packages
+        self.runner = runner
+
+    def serialize(self) -> tuple[str, dict[str, Any]]:
+        """Serializes BeamPipelineTrigger arguments and classpath."""
+        return (
+            "airflow.providers.apache.beam.triggers.beam.BeamPipelineTrigger",
+            {
+                "variables": self.variables,
+                "py_file": self.py_file,
+                "py_options": self.py_options,
+                "py_interpreter": self.py_interpreter,
+                "py_requirements": self.py_requirements,
+                "py_system_site_packages": self.py_system_site_packages,
+                "runner": self.runner,
+            },
+        )
+
+    async def run(self) -> AsyncIterator[TriggerEvent]:  # type: 
ignore[override]
+        """Gets current pipeline status and yields a TriggerEvent."""
+        hook = self._get_async_hook()
+        while True:
+            try:
+                return_code = await hook.start_python_pipeline_async(
+                    variables=self.variables,
+                    py_file=self.py_file,
+                    py_options=self.py_options,
+                    py_interpreter=self.py_interpreter,
+                    py_requirements=self.py_requirements,
+                    py_system_site_packages=self.py_system_site_packages,
+                )
+                if return_code == 0:
+                    yield TriggerEvent(
+                        {
+                            "status": "success",
+                            "message": "Pipeline has finished SUCCESSFULLY",
+                        }
+                    )
+                    return
+                else:
+                    yield TriggerEvent({"status": "error", "message": 
"Operation failed"})
+                    return
+
+            except Exception as e:
+                self.log.exception("Exception occurred while checking for 
pipeline state")
+                yield TriggerEvent({"status": "error", "message": str(e)})
+                return
+
+    def _get_async_hook(self) -> BeamAsyncHook:
+        return BeamAsyncHook(runner=self.runner)
diff --git a/docs/apache-airflow-providers-apache-beam/operators.rst 
b/docs/apache-airflow-providers-apache-beam/operators.rst
index fedf6b0265..f7487fb264 100644
--- a/docs/apache-airflow-providers-apache-beam/operators.rst
+++ b/docs/apache-airflow-providers-apache-beam/operators.rst
@@ -61,6 +61,23 @@ Python Pipelines with DirectRunner
     :start-after: [START 
howto_operator_start_python_direct_runner_pipeline_gcs_file]
     :end-before: [END 
howto_operator_start_python_direct_runner_pipeline_gcs_file]
 
+You can use deferrable mode for this action in order to run the operator 
asynchronously. It will give you a
+possibility to free up the worker when it knows it has to wait, and hand off 
the job of resuming Operator to a Trigger.
+As a result, while it is suspended (deferred), it is not taking up a worker 
slot and your cluster will have a
+lot less resources wasted on idle Operators or Sensors:
+
+.. exampleinclude:: 
/../../tests/system/providers/apache/beam/example_python_async.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
howto_operator_start_python_direct_runner_pipeline_local_file_async]
+    :end-before: [END 
howto_operator_start_python_direct_runner_pipeline_local_file_async]
+
+.. exampleinclude:: 
/../../tests/system/providers/apache/beam/example_python_async.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
howto_operator_start_python_direct_runner_pipeline_gcs_file_async]
+    :end-before: [END 
howto_operator_start_python_direct_runner_pipeline_gcs_file_async]
+
 Python Pipelines with DataflowRunner
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
@@ -76,6 +93,18 @@ Python Pipelines with DataflowRunner
     :start-after: [START 
howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
     :end-before: [END 
howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file]
 
+
+You can use deferrable mode for this action in order to run the operator 
asynchronously. It will give you a
+possibility to free up the worker when it knows it has to wait, and hand off 
the job of resuming Operator to a Trigger.
+As a result, while it is suspended (deferred), it is not taking up a worker 
slot and your cluster will have a
+lot less resources wasted on idle Operators or Sensors:
+
+.. exampleinclude:: 
/../../tests/system/providers/apache/beam/example_python_async.py
+    :language: python
+    :dedent: 4
+    :start-after: [START 
howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async]
+    :end-before: [END 
howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async]
+
 |
 |
 
diff --git a/tests/providers/apache/beam/hooks/test_beam.py 
b/tests/providers/apache/beam/hooks/test_beam.py
index 3e3e895cf9..1cfe71121c 100644
--- a/tests/providers/apache/beam/hooks/test_beam.py
+++ b/tests/providers/apache/beam/hooks/test_beam.py
@@ -21,12 +21,17 @@ import os
 import re
 import subprocess
 from unittest import mock
-from unittest.mock import ANY, MagicMock
+from unittest.mock import ANY, AsyncMock, MagicMock
 
 import pytest
 
 from airflow.exceptions import AirflowException
-from airflow.providers.apache.beam.hooks.beam import BeamHook, 
beam_options_to_args, run_beam_command
+from airflow.providers.apache.beam.hooks.beam import (
+    BeamAsyncHook,
+    BeamHook,
+    beam_options_to_args,
+    run_beam_command,
+)
 
 PY_FILE = "apache_beam.examples.wordcount"
 JAR_FILE = "unitest.jar"
@@ -391,3 +396,171 @@ class TestBeamOptionsToArgs:
     def test_beam_options_to_args(self, options, expected_args):
         args = beam_options_to_args(options)
         assert args == expected_args
+
+
+class TestBeamAsyncHook:
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async")
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir")
+    async def test_start_python_pipeline(self, mock_create_dir, mock_runner):
+        hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+        mock_create_dir.return_value = AsyncMock()
+        mock_runner.return_value = 0
+
+        await hook.start_python_pipeline_async(
+            variables=copy.deepcopy(BEAM_VARIABLES_PY),
+            py_file=PY_FILE,
+            py_options=PY_OPTIONS,
+        )
+
+        expected_cmd = [
+            "python3",
+            "-m",
+            PY_FILE,
+            f"--runner={DEFAULT_RUNNER}",
+            "--output=gs://test/output",
+            "--labels=foo=bar",
+        ]
+        mock_create_dir.assert_called_once()
+        mock_runner.assert_called_once_with(
+            cmd=expected_cmd,
+            working_directory=None,
+            log=ANY,
+        )
+
+    @pytest.mark.asyncio
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", 
return_value=b"2.35.0")
+    async def test_start_python_pipeline_unsupported_option(self, 
mock_check_output):
+        hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+
+        with pytest.raises(
+            AirflowException,
+            match=re.escape("The impersonateServiceAccount option requires 
Apache Beam 2.39.0 or newer."),
+        ):
+            await hook.start_python_pipeline_async(
+                variables={
+                    "impersonate_service_account": "t...@impersonation.com",
+                },
+                py_file="/tmp/file.py",
+                py_options=["-m"],
+                py_interpreter="python3",
+                py_requirements=None,
+                py_system_site_packages=False,
+            )
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "py_interpreter",
+        [
+            pytest.param("python", id="default python"),
+            pytest.param("python2", id="major python version 2.x"),
+            pytest.param("python3", id="major python version 3.x"),
+            pytest.param("python3.6", id="major.minor python version"),
+        ],
+    )
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async")
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir")
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", 
return_value=b"2.39.0")
+    async def test_start_python_pipeline_with_custom_interpreter(
+        self, mock_check_output, mock_create_dir, mock_runner, py_interpreter
+    ):
+        hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+        mock_create_dir.return_value = AsyncMock()
+        mock_runner.return_value = 0
+
+        await hook.start_python_pipeline_async(
+            variables=copy.deepcopy(BEAM_VARIABLES_PY),
+            py_file=PY_FILE,
+            py_options=PY_OPTIONS,
+            py_interpreter=py_interpreter,
+        )
+
+        expected_cmd = [
+            py_interpreter,
+            "-m",
+            PY_FILE,
+            f"--runner={DEFAULT_RUNNER}",
+            "--output=gs://test/output",
+            "--labels=foo=bar",
+        ]
+        mock_runner.assert_called_once_with(
+            cmd=expected_cmd,
+            working_directory=None,
+            log=ANY,
+        )
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        "current_py_requirements, current_py_system_site_packages",
+        [
+            pytest.param("foo-bar", False, id="requirements without system 
site-packages"),
+            pytest.param("foo-bar", True, id="requirements with system 
site-packages"),
+            pytest.param([], True, id="only system site-packages"),
+        ],
+    )
+    @mock.patch(BEAM_STRING.format("prepare_virtualenv"))
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async")
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", 
return_value=b"2.39.0")
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir")
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._cleanup_tmp_dir")
+    async def 
test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages(
+        self,
+        mock_cleanup_dir,
+        mock_create_dir,
+        mock_check_output,
+        mock_runner,
+        mock_virtualenv,
+        current_py_requirements,
+        current_py_system_site_packages,
+    ):
+        hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+        mock_create_dir.return_value = AsyncMock()
+        mock_virtualenv.return_value = "/dummy_dir/bin/python"
+        mock_cleanup_dir.return_value = AsyncMock()
+
+        await hook.start_python_pipeline_async(
+            variables=copy.deepcopy(BEAM_VARIABLES_PY),
+            py_file=PY_FILE,
+            py_options=PY_OPTIONS,
+            py_requirements=current_py_requirements,
+            py_system_site_packages=current_py_system_site_packages,
+        )
+
+        expected_cmd = [
+            "/dummy_dir/bin/python",
+            "-m",
+            PY_FILE,
+            f"--runner={DEFAULT_RUNNER}",
+            "--output=gs://test/output",
+            "--labels=foo=bar",
+        ]
+        mock_runner.assert_called_once_with(
+            cmd=expected_cmd,
+            working_directory=None,
+            log=ANY,
+        )
+        mock_virtualenv.assert_called_once_with(
+            venv_directory=mock.ANY,
+            python_bin="python3",
+            system_site_packages=current_py_system_site_packages,
+            requirements=current_py_requirements,
+        )
+        mock_create_dir.assert_called_once()
+
+    @pytest.mark.asyncio
+    @mock.patch(BEAM_STRING.format("run_beam_command"))
+    
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", 
return_value=b"2.39.0")
+    async def 
test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages(
+        self, mock_check_output, mock_runner
+    ):
+        hook = BeamAsyncHook(runner=DEFAULT_RUNNER)
+
+        with pytest.raises(AirflowException, match=r"Invalid method 
invocation\."):
+            await hook.start_python_pipeline_async(
+                variables=copy.deepcopy(BEAM_VARIABLES_PY),
+                py_file=PY_FILE,
+                py_options=PY_OPTIONS,
+                py_requirements=[],
+            )
+
+        mock_runner.assert_not_called()
diff --git a/tests/providers/apache/beam/operators/test_beam.py 
b/tests/providers/apache/beam/operators/test_beam.py
index 9bdf016f48..c5949dd868 100644
--- a/tests/providers/apache/beam/operators/test_beam.py
+++ b/tests/providers/apache/beam/operators/test_beam.py
@@ -22,11 +22,13 @@ from unittest.mock import MagicMock, call
 
 import pytest
 
+from airflow.exceptions import AirflowException, TaskDeferred
 from airflow.providers.apache.beam.operators.beam import (
     BeamRunGoPipelineOperator,
     BeamRunJavaPipelineOperator,
     BeamRunPythonPipelineOperator,
 )
+from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger
 from airflow.providers.google.cloud.operators.dataflow import 
DataflowConfiguration
 from airflow.version import version
 
@@ -150,13 +152,6 @@ class TestBeamRunPythonPipelineOperator:
             py_system_site_packages=False,
             process_line_callback=mock.ANY,
         )
-        dataflow_hook_mock.return_value.wait_for_done.assert_called_once_with(
-            job_id=self.operator.dataflow_job_id,
-            job_name=job_name,
-            location="us-central1",
-            multiple_jobs=False,
-            project_id=dataflow_config.project_id,
-        )
         
dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with()
 
     
@mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist")
@@ -268,13 +263,6 @@ class TestBeamRunJavaPipelineOperator:
             job_class=JOB_CLASS,
             process_line_callback=mock.ANY,
         )
-        dataflow_hook_mock.return_value.wait_for_done.assert_called_once_with(
-            job_id=self.operator.dataflow_job_id,
-            job_name=job_name,
-            location="us-central1",
-            multiple_jobs=False,
-            project_id=dataflow_hook_mock.return_value.project_id,
-        )
 
     
@mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist")
     @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
@@ -696,3 +684,142 @@ class TestBeamRunGoPipelineOperator:
         self.operator.execute(None)
         self.operator.on_kill()
         dataflow_cancel_job.assert_not_called()
+
+
+class TestBeamRunPythonPipelineOperatorAsync:
+    def setup_method(self):
+        self.operator = BeamRunPythonPipelineOperator(
+            task_id=TASK_ID,
+            py_file=PY_FILE,
+            py_options=PY_OPTIONS,
+            default_pipeline_options=DEFAULT_OPTIONS,
+            pipeline_options=ADDITIONAL_OPTIONS,
+            deferrable=True,
+        )
+
+    def test_init(self):
+        """Test BeamRunPythonPipelineOperator instance is properly 
initialized."""
+        assert self.operator.task_id == TASK_ID
+        assert self.operator.py_file == PY_FILE
+        assert self.operator.runner == DEFAULT_RUNNER
+        assert self.operator.py_options == PY_OPTIONS
+        assert self.operator.py_interpreter == PY_INTERPRETER
+        assert self.operator.default_pipeline_options == DEFAULT_OPTIONS
+        assert self.operator.pipeline_options == EXPECTED_ADDITIONAL_OPTIONS
+
+    @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+    def test_async_execute_should_execute_successfully(self, gcs_hook, 
beam_hook_mock):
+        """
+        Asserts that a task is deferred and the BeamPipelineTrigger will be 
fired
+        when the BeamRunPythonPipelineOperator is executed in deferrable mode 
when deferrable=True.
+        """
+        with pytest.raises(TaskDeferred) as exc:
+            self.operator.execute(context=mock.MagicMock())
+
+        assert isinstance(exc.value.trigger, BeamPipelineTrigger), "Trigger is 
not a BeamPipelineTrigger"
+
+    def test_async_execute_should_throw_exception(self):
+        """Tests that an AirflowException is raised in case of error event"""
+
+        with pytest.raises(AirflowException):
+            self.operator.execute_complete(
+                context=mock.MagicMock(), event={"status": "error", "message": 
"test failure message"}
+            )
+
+    def test_async_execute_logging_should_execute_successfully(self):
+        """Asserts that logging occurs as expected"""
+
+        with mock.patch.object(self.operator.log, "info") as mock_log_info:
+            self.operator.execute_complete(
+                context=mock.MagicMock(),
+                event={"status": "success", "message": "Pipeline has finished 
SUCCESSFULLY"},
+            )
+        mock_log_info.assert_called_with(
+            "%s completed with response %s ", TASK_ID, "Pipeline has finished 
SUCCESSFULLY"
+        )
+
+    @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+    def test_async_execute_direct_runner(self, gcs_hook, beam_hook_mock):
+        """
+        Test BeamHook is created and the right args are passed to
+        start_python_workflow when executing direct runner.
+        """
+        gcs_provide_file = gcs_hook.return_value.provide_file
+        with pytest.raises(TaskDeferred):
+            self.operator.execute(context=mock.MagicMock())
+        beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER)
+        gcs_provide_file.assert_called_once_with(object_url=PY_FILE)
+
+    
@mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+    def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, 
beam_hook_mock, persist_link_mock):
+        """
+        Test DataflowHook is created and the right args are passed to
+        start_python_dataflow when executing Dataflow runner.
+        """
+
+        dataflow_config = 
DataflowConfiguration(impersonation_chain=TEST_IMPERSONATION_ACCOUNT)
+        self.operator.runner = "DataflowRunner"
+        self.operator.dataflow_config = dataflow_config
+        gcs_provide_file = gcs_hook.return_value.provide_file
+        magic_mock = mock.MagicMock()
+        with pytest.raises(TaskDeferred):
+            self.operator.execute(context=magic_mock)
+
+        job_name = dataflow_hook_mock.build_dataflow_job_name.return_value
+        dataflow_hook_mock.assert_called_once_with(
+            gcp_conn_id=dataflow_config.gcp_conn_id,
+            poll_sleep=dataflow_config.poll_sleep,
+            impersonation_chain=dataflow_config.impersonation_chain,
+            drain_pipeline=dataflow_config.drain_pipeline,
+            cancel_timeout=dataflow_config.cancel_timeout,
+            wait_until_finished=dataflow_config.wait_until_finished,
+        )
+        expected_options = {
+            "project": dataflow_hook_mock.return_value.project_id,
+            "job_name": job_name,
+            "staging_location": "gs://test/staging",
+            "output": "gs://test/output",
+            "labels": {"foo": "bar", "airflow-version": TEST_VERSION},
+            "region": "us-central1",
+            "impersonate_service_account": TEST_IMPERSONATION_ACCOUNT,
+        }
+        gcs_provide_file.assert_called_once_with(object_url=PY_FILE)
+        persist_link_mock.assert_called_once_with(
+            self.operator,
+            magic_mock,
+            expected_options["project"],
+            expected_options["region"],
+            self.operator.dataflow_job_id,
+        )
+        beam_hook_mock.return_value.start_python_pipeline.assert_not_called()
+        
dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with()
+
+    
@mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook")
+    def test_on_kill_dataflow_runner(self, dataflow_hook_mock, _, __, ___):
+        self.operator.runner = "DataflowRunner"
+        dataflow_cancel_job = dataflow_hook_mock.return_value.cancel_job
+        with pytest.raises(TaskDeferred):
+            self.operator.execute(context=mock.MagicMock())
+        self.operator.dataflow_job_id = JOB_ID
+        self.operator.on_kill()
+        dataflow_cancel_job.assert_called_once_with(
+            job_id=JOB_ID, project_id=self.operator.dataflow_config.project_id
+        )
+
+    @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook")
+    @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook")
+    def test_on_kill_direct_runner(self, _, dataflow_mock, __):
+        dataflow_cancel_job = dataflow_mock.return_value.cancel_job
+        with pytest.raises(TaskDeferred):
+            self.operator.execute(mock.MagicMock())
+        self.operator.on_kill()
+        dataflow_cancel_job.assert_not_called()
diff --git a/tests/providers/apache/beam/triggers/__init__.py 
b/tests/providers/apache/beam/triggers/__init__.py
new file mode 100644
index 0000000000..13a83393a9
--- /dev/null
+++ b/tests/providers/apache/beam/triggers/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
diff --git a/tests/providers/apache/beam/triggers/test_beam.py 
b/tests/providers/apache/beam/triggers/test_beam.py
new file mode 100644
index 0000000000..ce22dda215
--- /dev/null
+++ b/tests/providers/apache/beam/triggers/test_beam.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger
+from airflow.triggers.base import TriggerEvent
+
+HOOK_STATUS_STR = 
"airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.start_python_pipeline_async"
+CLASSPATH = "airflow.providers.apache.beam.triggers.beam.BeamPipelineTrigger"
+
+TASK_ID = "test_task"
+LOCATION = "test-location"
+INSTANCE_NAME = "airflow-test-instance"
+INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME}
+PROJECT_ID = "test_project_id"
+TEST_VARIABLES = {"output": "gs://bucket_test/output", "labels": 
{"airflow-version": "v2-7-0-dev0"}}
+TEST_PY_FILE = "apache_beam.examples.wordcount"
+TEST_PY_OPTIONS = []
+TEST_PY_INTERPRETER = "python3"
+TEST_PY_REQUIREMENTS = ["apache-beam[gcp]==2.46.0"]
+TEST_PY_PACKAGES = False
+TEST_RUNNER = "DirectRunner"
+
+
+@pytest.fixture
+def trigger():
+    return BeamPipelineTrigger(
+        variables=TEST_VARIABLES,
+        py_file=TEST_PY_FILE,
+        py_options=TEST_PY_OPTIONS,
+        py_interpreter=TEST_PY_INTERPRETER,
+        py_requirements=TEST_PY_REQUIREMENTS,
+        py_system_site_packages=TEST_PY_PACKAGES,
+        runner=TEST_RUNNER,
+    )
+
+
+class TestBeamPipelineTrigger:
+    def test_beam_trigger_serialization_should_execute_successfully(self, 
trigger):
+        """
+        Asserts that the BeamPipelineTrigger correctly serializes its arguments
+        and classpath.
+        """
+        classpath, kwargs = trigger.serialize()
+        assert classpath == CLASSPATH
+        assert kwargs == {
+            "variables": TEST_VARIABLES,
+            "py_file": TEST_PY_FILE,
+            "py_options": TEST_PY_OPTIONS,
+            "py_interpreter": TEST_PY_INTERPRETER,
+            "py_requirements": TEST_PY_REQUIREMENTS,
+            "py_system_site_packages": TEST_PY_PACKAGES,
+            "runner": TEST_RUNNER,
+        }
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STATUS_STR)
+    async def test_beam_trigger_on_success_should_execute_successfully(self, 
mock_pipeline_status, trigger):
+        """
+        Tests the BeamPipelineTrigger only fires once the job execution 
reaches a successful state.
+        """
+        mock_pipeline_status.return_value = 0
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "success", "message": "Pipeline has 
finished SUCCESSFULLY"}) == actual
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STATUS_STR)
+    async def test_beam_trigger_error_should_execute_successfully(self, 
mock_pipeline_status, trigger):
+        """
+        Test that BeamPipelineTrigger fires the correct event in case of an 
error.
+        """
+        mock_pipeline_status.return_value = 1
+
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "Operation 
failed"}) == actual
+
+    @pytest.mark.asyncio
+    @mock.patch(HOOK_STATUS_STR)
+    async def test_beam_trigger_exception_should_execute_successfully(self, 
mock_pipeline_status, trigger):
+        """
+        Test that BeamPipelineTrigger fires the correct event in case of an 
error.
+        """
+        mock_pipeline_status.side_effect = Exception("Test exception")
+
+        generator = trigger.run()
+        actual = await generator.asend(None)
+        assert TriggerEvent({"status": "error", "message": "Test exception"}) 
== actual
diff --git a/tests/system/providers/apache/beam/example_python.py 
b/tests/system/providers/apache/beam/example_python.py
index 50c469c73d..929844ec54 100644
--- a/tests/system/providers/apache/beam/example_python.py
+++ b/tests/system/providers/apache/beam/example_python.py
@@ -47,7 +47,7 @@ with models.DAG(
         task_id="start_python_pipeline_local_direct_runner",
         py_file="apache_beam.examples.wordcount",
         py_options=["-m"],
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
     )
@@ -59,7 +59,7 @@ with models.DAG(
         py_file=GCS_PYTHON,
         py_options=[],
         pipeline_options={"output": GCS_OUTPUT},
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
     )
@@ -76,7 +76,7 @@ with models.DAG(
             "output": GCS_OUTPUT,
         },
         py_options=[],
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
         dataflow_config=DataflowConfiguration(
@@ -90,7 +90,7 @@ with models.DAG(
         py_file="apache_beam.examples.wordcount",
         runner="SparkRunner",
         py_options=["-m"],
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
     )
@@ -103,7 +103,7 @@ with models.DAG(
         pipeline_options={
             "output": "/tmp/start_python_pipeline_local_flink_runner",
         },
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
     )
@@ -113,6 +113,7 @@ with models.DAG(
             start_python_pipeline_local_direct_runner,
             start_python_pipeline_direct_runner,
         ]
+        >> start_python_pipeline_dataflow_runner
         >> start_python_pipeline_local_flink_runner
         >> start_python_pipeline_local_spark_runner
     )
diff --git a/tests/system/providers/apache/beam/example_python.py 
b/tests/system/providers/apache/beam/example_python_async.py
similarity index 82%
copy from tests/system/providers/apache/beam/example_python.py
copy to tests/system/providers/apache/beam/example_python_async.py
index 50c469c73d..520e8adc4c 100644
--- a/tests/system/providers/apache/beam/example_python.py
+++ b/tests/system/providers/apache/beam/example_python_async.py
@@ -34,7 +34,7 @@ from tests.system.providers.apache.beam.utils import (
 )
 
 with models.DAG(
-    "example_beam_native_python",
+    dag_id="dataflow_native_python_async",
     start_date=START_DATE,
     schedule=None,  # Override to match your needs
     catchup=False,
@@ -42,30 +42,32 @@ with models.DAG(
     tags=["example"],
 ) as dag:
 
-    # [START howto_operator_start_python_direct_runner_pipeline_local_file]
+    # [START 
howto_operator_start_python_direct_runner_pipeline_local_file_async]
     start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
         task_id="start_python_pipeline_local_direct_runner",
         py_file="apache_beam.examples.wordcount",
         py_options=["-m"],
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
+        deferrable=True,
     )
-    # [END howto_operator_start_python_direct_runner_pipeline_local_file]
+    # [END howto_operator_start_python_direct_runner_pipeline_local_file_async]
 
-    # [START howto_operator_start_python_direct_runner_pipeline_gcs_file]
+    # [START howto_operator_start_python_direct_runner_pipeline_gcs_file_async]
     start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
         task_id="start_python_pipeline_direct_runner",
         py_file=GCS_PYTHON,
         py_options=[],
         pipeline_options={"output": GCS_OUTPUT},
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
+        deferrable=True,
     )
-    # [END howto_operator_start_python_direct_runner_pipeline_gcs_file]
+    # [END howto_operator_start_python_direct_runner_pipeline_gcs_file_async]
 
-    # [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
+    # [START 
howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async]
     start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
         task_id="start_python_pipeline_dataflow_runner",
         runner="DataflowRunner",
@@ -76,25 +78,30 @@ with models.DAG(
             "output": GCS_OUTPUT,
         },
         py_options=[],
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
         dataflow_config=DataflowConfiguration(
             job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, 
location="us-central1"
         ),
+        deferrable=True,
     )
-    # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file]
+    # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async]
 
+    # [START 
howto_operator_start_python_pipeline_local_runner_spark_runner_async]
     start_python_pipeline_local_spark_runner = BeamRunPythonPipelineOperator(
         task_id="start_python_pipeline_local_spark_runner",
         py_file="apache_beam.examples.wordcount",
         runner="SparkRunner",
         py_options=["-m"],
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
+        deferrable=True,
     )
+    # [END 
howto_operator_start_python_pipeline_local_runner_spark_runner_async]
 
+    # [START 
howto_operator_start_python_pipeline_local_runner_flink_runner_async]
     start_python_pipeline_local_flink_runner = BeamRunPythonPipelineOperator(
         task_id="start_python_pipeline_local_flink_runner",
         py_file="apache_beam.examples.wordcount",
@@ -103,16 +110,19 @@ with models.DAG(
         pipeline_options={
             "output": "/tmp/start_python_pipeline_local_flink_runner",
         },
-        py_requirements=["apache-beam[gcp]==2.26.0"],
+        py_requirements=["apache-beam[gcp]==2.46.0"],
         py_interpreter="python3",
         py_system_site_packages=False,
+        deferrable=True,
     )
+    # [END 
howto_operator_start_python_pipeline_local_runner_flink_runner_async]
 
     (
         [
             start_python_pipeline_local_direct_runner,
             start_python_pipeline_direct_runner,
         ]
+        >> start_python_pipeline_dataflow_runner
         >> start_python_pipeline_local_flink_runner
         >> start_python_pipeline_local_spark_runner
     )

Reply via email to