This is an automated email from the ASF dual-hosted git repository. pankaj pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 44b4a3755d Add deferrable mode to `BeamRunPythonPipelineOperator` (#31471) 44b4a3755d is described below commit 44b4a3755deea23f8ff0eb1316db880b1d64812c Author: VladaZakharova <80038284+vladazakhar...@users.noreply.github.com> AuthorDate: Thu Jul 13 18:37:11 2023 +0200 Add deferrable mode to `BeamRunPythonPipelineOperator` (#31471) * Add deferrable mode to BeamRunPythonPipelineOperator * Fix docs * Fix deferrable parameter in init * fix system test --- airflow/providers/apache/beam/hooks/beam.py | 190 ++++++++++++++++++++- airflow/providers/apache/beam/operators/beam.py | 113 +++++++++--- airflow/providers/apache/beam/provider.yaml | 5 + airflow/providers/apache/beam/triggers/__init__.py | 16 ++ airflow/providers/apache/beam/triggers/beam.py | 116 +++++++++++++ .../operators.rst | 29 ++++ tests/providers/apache/beam/hooks/test_beam.py | 177 ++++++++++++++++++- tests/providers/apache/beam/operators/test_beam.py | 155 +++++++++++++++-- tests/providers/apache/beam/triggers/__init__.py | 16 ++ tests/providers/apache/beam/triggers/test_beam.py | 107 ++++++++++++ .../system/providers/apache/beam/example_python.py | 11 +- .../{example_python.py => example_python_async.py} | 34 ++-- 12 files changed, 911 insertions(+), 58 deletions(-) diff --git a/airflow/providers/apache/beam/hooks/beam.py b/airflow/providers/apache/beam/hooks/beam.py index 264d9076e0..d92ade2050 100644 --- a/airflow/providers/apache/beam/hooks/beam.py +++ b/airflow/providers/apache/beam/hooks/beam.py @@ -18,8 +18,10 @@ """This module contains a Apache Beam Hook.""" from __future__ import annotations +import asyncio import contextlib import copy +import functools import json import logging import os @@ -27,8 +29,8 @@ import select import shlex import shutil import subprocess +import tempfile import textwrap -from tempfile import TemporaryDirectory from typing import Callable from packaging.version import Version @@ -222,7 +224,7 @@ class BeamHook(BaseHook): If a value is passed to this parameter, a new virtual environment has been created with additional packages installed. - You could also install the apache-beam package if it is not installed on your system or you want + You could also install the apache-beam package if it is not installed on your system, or you want to use a different version. :param py_system_site_packages: Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information. @@ -251,7 +253,7 @@ class BeamHook(BaseHook): """ ) raise AirflowException(warning_invalid_environment) - tmp_dir = exit_stack.enter_context(TemporaryDirectory(prefix="apache-beam-venv")) + tmp_dir = exit_stack.enter_context(tempfile.TemporaryDirectory(prefix="apache-beam-venv")) py_interpreter = prepare_virtualenv( venv_directory=tmp_dir, python_bin=py_interpreter, @@ -381,3 +383,185 @@ class BeamHook(BaseHook): command_prefix=command_prefix, process_line_callback=process_line_callback, ) + + +class BeamAsyncHook(BeamHook): + """ + Asynchronous hook for Apache Beam. + :param runner: Runner type. + """ + + def __init__( + self, + runner: str, + ) -> None: + self.runner = runner + super().__init__(runner=self.runner) + + @staticmethod + async def _create_tmp_dir(prefix: str) -> str: + """Helper method to create temporary directory.""" + # Creating separate thread to create temporary directory + loop = asyncio.get_running_loop() + partial_func = functools.partial(tempfile.mkdtemp, prefix=prefix) + tmp_dir = await loop.run_in_executor(None, partial_func) + return tmp_dir + + @staticmethod + async def _cleanup_tmp_dir(tmp_dir: str) -> None: + """ + Helper method to delete temporary directory after finishing work with it. + Is uses `rmtree` method to recursively remove the temporary directory. + """ + shutil.rmtree(tmp_dir) + + async def start_python_pipeline_async( + self, + variables: dict, + py_file: str, + py_options: list[str] | None = None, + py_interpreter: str = "python3", + py_requirements: list[str] | None = None, + py_system_site_packages: bool = False, + ): + """ + Starts Apache Beam python pipeline. + + :param variables: Variables passed to the pipeline. + :param py_file: Path to the python file to execute. + :param py_options: Additional options. + :param py_interpreter: Python version of the Apache Beam pipeline. + If None, this defaults to the python3. + To track python versions supported by beam and related + issues check: https://issues.apache.org/jira/browse/BEAM-1251 + :param py_requirements: Additional python package(s) to install. + If a value is passed to this parameter, a new virtual environment has been created with + additional packages installed. + You could also install the apache-beam package if it is not installed on your system, or you want + to use a different version. + :param py_system_site_packages: Whether to include system_site_packages in your virtualenv. + See virtualenv documentation for more information. + This option is only relevant if the ``py_requirements`` parameter is not None. + """ + if "labels" in variables: + variables["labels"] = [f"{key}={value}" for key, value in variables["labels"].items()] + + # Creating temporary directory + tmp_dir = await self._create_tmp_dir(prefix="apache-beam-venv") + + async with contextlib.AsyncExitStack() as exit_stack: + if py_requirements is not None: + if not py_requirements and not py_system_site_packages: + warning_invalid_environment = textwrap.dedent( + """\ + Invalid method invocation. You have disabled inclusion of system packages and empty + list required for installation, so it is not possible to create a valid virtual + environment. In the virtual environment, apache-beam package must be installed for + your job to be executed. + + To fix this problem: + * install apache-beam on the system, then set parameter py_system_site_packages + to True, + * add apache-beam to the list of required packages in parameter py_requirements. + """ + ) + raise AirflowException(warning_invalid_environment) + + # Pushing asynchronous callback to ensure the cleanup of the temporary + # directory when the asynchronous context is exited + exit_stack.push_async_callback(self._cleanup_tmp_dir, tmp_dir) + + py_interpreter = prepare_virtualenv( + venv_directory=tmp_dir, + python_bin=py_interpreter, + system_site_packages=py_system_site_packages, + requirements=py_requirements, + ) + command_prefix: list[str] = [py_interpreter] + (py_options or []) + [py_file] + beam_version = ( + subprocess.check_output( + [py_interpreter, "-c", "import apache_beam; print(apache_beam.__version__)"] + ) + .decode() + .strip() + ) + self.log.info("Beam version: %s", beam_version) + impersonate_service_account = variables.get("impersonate_service_account") + if impersonate_service_account: + if Version(beam_version) < Version("2.39.0") or True: + raise AirflowException( + "The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer." + ) + return_code = await self.start_pipeline_async( + variables=variables, + command_prefix=command_prefix, + ) + return return_code + + async def start_pipeline_async( + self, + variables: dict, + command_prefix: list[str], + working_directory: str | None = None, + ) -> int: + cmd = command_prefix + [ + f"--runner={self.runner}", + ] + if variables: + cmd.extend(beam_options_to_args(variables)) + return await self.run_beam_command_async( + cmd=cmd, + working_directory=working_directory, + log=self.log, + ) + + async def run_beam_command_async( + self, + cmd: list[str], + log: logging.Logger, + working_directory: str | None = None, + ) -> int: + """ + Function responsible for running pipeline command in subprocess. + + :param cmd: Parts of the command to be run in subprocess + :param working_directory: Working directory + :param log: logger. + """ + cmd_str_representation = " ".join(shlex.quote(c) for c in cmd) + log.info("Running command: %s", cmd_str_representation) + + # Creating a separate asynchronous process + process = await asyncio.create_subprocess_shell( + cmd_str_representation, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + close_fds=True, + cwd=working_directory, + ) + # Waits for Apache Beam pipeline to complete. + log.info("Start waiting for Apache Beam process to complete.") + + # Creating separate threads for stdout and stderr + stdout_task = asyncio.create_task(self.read_logs(process.stdout)) + stderr_task = asyncio.create_task(self.read_logs(process.stderr)) + + # Waiting for the both tasks to complete + await asyncio.gather(stdout_task, stderr_task) + + # Wait for the process to complete and return return_code + return_code = await process.wait() + log.info("Process exited with return code: %s", return_code) + + if return_code != 0: + raise AirflowException(f"Apache Beam process failed with return code {return_code}") + return return_code + + async def read_logs(self, stream_reader): + while True: + line = await stream_reader.readline() + if not line: + break + decoded_line = line.decode().strip() + self.log.info(decoded_line) diff --git a/airflow/providers/apache/beam/operators/beam.py b/airflow/providers/apache/beam/operators/beam.py index 68e6972547..cc7d217abb 100644 --- a/airflow/providers/apache/beam/operators/beam.py +++ b/airflow/providers/apache/beam/operators/beam.py @@ -18,6 +18,8 @@ """This module contains Apache Beam operators.""" from __future__ import annotations +import asyncio +import contextlib import copy import os import stat @@ -26,11 +28,13 @@ from abc import ABC, ABCMeta, abstractmethod from concurrent.futures import ThreadPoolExecutor, as_completed from contextlib import ExitStack from functools import partial -from typing import TYPE_CHECKING, Callable, Sequence +from typing import IO, TYPE_CHECKING, Any, Callable, Sequence from airflow import AirflowException +from airflow.configuration import conf from airflow.models import BaseOperator from airflow.providers.apache.beam.hooks.beam import BeamHook, BeamRunnerType +from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger from airflow.providers.google.cloud.hooks.dataflow import ( DataflowHook, process_line_and_extract_dataflow_job_id_callback, @@ -144,7 +148,7 @@ class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC): When defining labels (labels option), you can also provide a dictionary. :param gcp_conn_id: Optional. The connection ID to use connecting to Google Cloud Storage if python file is on GCS. - :param dataflow_config: Dataflow configuration, used when runner type is set to DataflowRunner, + :param dataflow_config: Dataflow's configuration, used when runner type is set to DataflowRunner, (optional) defaults to None. """ @@ -167,7 +171,7 @@ class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC): self.dataflow_config = DataflowConfiguration(**dataflow_config) else: self.dataflow_config = dataflow_config or DataflowConfiguration() - self.beam_hook: BeamHook | None = None + self.beam_hook: BeamHook self.dataflow_hook: DataflowHook | None = None self.dataflow_job_id: str | None = None @@ -237,8 +241,8 @@ class BeamRunPythonPipelineOperator(BeamBasePipelineOperator): to use a different version. :param py_system_site_packages: Whether to include system_site_packages in your virtualenv. See virtualenv documentation for more information. - This option is only relevant if the ``py_requirements`` parameter is not None. + :param deferrable: Run operator in the deferrable mode: checks for the state using asynchronous calls. """ template_fields: Sequence[str] = ( @@ -264,6 +268,7 @@ class BeamRunPythonPipelineOperator(BeamBasePipelineOperator): py_system_site_packages: bool = False, gcp_conn_id: str = "google_cloud_default", dataflow_config: DataflowConfiguration | dict | None = None, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs, ) -> None: super().__init__( @@ -283,35 +288,42 @@ class BeamRunPythonPipelineOperator(BeamBasePipelineOperator): self.pipeline_options.setdefault("labels", {}).update( {"airflow-version": "v" + version.replace(".", "-").replace("+", "-")} ) + self.deferrable = deferrable def execute(self, context: Context): """Execute the Apache Beam Pipeline.""" ( - is_dataflow, - dataflow_job_name, - snake_case_pipeline_options, - process_line_callback, + self.is_dataflow, + self.dataflow_job_name, + self.snake_case_pipeline_options, + self.process_line_callback, ) = self._init_pipeline_options(format_pipeline_options=True, job_name_variable_key="job_name") - if not self.beam_hook: raise AirflowException("Beam hook is not defined.") + # Check deferrable parameter passed to the operator + # to determine type of run - asynchronous or synchronous + if self.deferrable: + asyncio.run(self.execute_async(context)) + else: + return self.execute_sync(context) + def execute_sync(self, context: Context): with ExitStack() as exit_stack: if self.py_file.lower().startswith("gs://"): gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id) tmp_gcs_file = exit_stack.enter_context(gcs_hook.provide_file(object_url=self.py_file)) self.py_file = tmp_gcs_file.name - if is_dataflow and self.dataflow_hook: + if self.is_dataflow and self.dataflow_hook: with self.dataflow_hook.provide_authorized_gcloud(): self.beam_hook.start_python_pipeline( - variables=snake_case_pipeline_options, + variables=self.snake_case_pipeline_options, py_file=self.py_file, py_options=self.py_options, py_interpreter=self.py_interpreter, py_requirements=self.py_requirements, py_system_site_packages=self.py_system_site_packages, - process_line_callback=process_line_callback, + process_line_callback=self.process_line_callback, ) DataflowJobLink.persist( self, @@ -320,25 +332,82 @@ class BeamRunPythonPipelineOperator(BeamBasePipelineOperator): self.dataflow_config.location, self.dataflow_job_id, ) - if dataflow_job_name and self.dataflow_config.location: - self.dataflow_hook.wait_for_done( - job_name=dataflow_job_name, - location=self.dataflow_config.location, - job_id=self.dataflow_job_id, - multiple_jobs=False, - project_id=self.dataflow_config.project_id, - ) return {"dataflow_job_id": self.dataflow_job_id} else: self.beam_hook.start_python_pipeline( - variables=snake_case_pipeline_options, + variables=self.snake_case_pipeline_options, py_file=self.py_file, py_options=self.py_options, py_interpreter=self.py_interpreter, py_requirements=self.py_requirements, py_system_site_packages=self.py_system_site_packages, - process_line_callback=process_line_callback, + process_line_callback=self.process_line_callback, + ) + + async def execute_async(self, context: Context): + # Creating a new event loop to manage I/O operations asynchronously + loop = asyncio.get_event_loop() + if self.py_file.lower().startswith("gs://"): + gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id) + # Running synchronous `enter_context()` method in a separate + # thread using the default executor `None`. The `run_in_executor()` function returns the + # file object, which is created using gcs function `provide_file()`, asynchronously. + # This means we can perform asynchronous operations with this file. + create_tmp_file_call = gcs_hook.provide_file(object_url=self.py_file) + tmp_gcs_file: IO[str] = await loop.run_in_executor( + None, contextlib.ExitStack().enter_context, create_tmp_file_call + ) + self.py_file = tmp_gcs_file.name + + if self.is_dataflow and self.dataflow_hook: + DataflowJobLink.persist( + self, + context, + self.dataflow_config.project_id, + self.dataflow_config.location, + self.dataflow_job_id, + ) + with self.dataflow_hook.provide_authorized_gcloud(): + self.defer( + trigger=BeamPipelineTrigger( + variables=self.snake_case_pipeline_options, + py_file=self.py_file, + py_options=self.py_options, + py_interpreter=self.py_interpreter, + py_requirements=self.py_requirements, + py_system_site_packages=self.py_system_site_packages, + runner=self.runner, + ), + method_name="execute_complete", ) + else: + self.defer( + trigger=BeamPipelineTrigger( + variables=self.snake_case_pipeline_options, + py_file=self.py_file, + py_options=self.py_options, + py_interpreter=self.py_interpreter, + py_requirements=self.py_requirements, + py_system_site_packages=self.py_system_site_packages, + runner=self.runner, + ), + method_name="execute_complete", + ) + + def execute_complete(self, context: Context, event: dict[str, Any]): + """ + Callback for when the trigger fires - returns immediately. + Relies on trigger to throw an exception, otherwise it assumes execution was + successful. + """ + if event["status"] == "error": + raise AirflowException(event["message"]) + self.log.info( + "%s completed with response %s ", + self.task_id, + event["message"], + ) + return {"dataflow_job_id": self.dataflow_job_id} def on_kill(self) -> None: if self.dataflow_hook and self.dataflow_job_id: diff --git a/airflow/providers/apache/beam/provider.yaml b/airflow/providers/apache/beam/provider.yaml index d177e251d7..dbc59a95af 100644 --- a/airflow/providers/apache/beam/provider.yaml +++ b/airflow/providers/apache/beam/provider.yaml @@ -63,6 +63,11 @@ hooks: python-modules: - airflow.providers.apache.beam.hooks.beam +triggers: + - integration-name: Apache Beam + python-modules: + - airflow.providers.apache.beam.triggers.beam + additional-extras: - name: google dependencies: diff --git a/airflow/providers/apache/beam/triggers/__init__.py b/airflow/providers/apache/beam/triggers/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/airflow/providers/apache/beam/triggers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/apache/beam/triggers/beam.py b/airflow/providers/apache/beam/triggers/beam.py new file mode 100644 index 0000000000..2926a17b45 --- /dev/null +++ b/airflow/providers/apache/beam/triggers/beam.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Any, AsyncIterator + +from airflow.providers.apache.beam.hooks.beam import BeamAsyncHook +from airflow.triggers.base import BaseTrigger, TriggerEvent + + +class BeamPipelineTrigger(BaseTrigger): + """ + Trigger to perform checking the pipeline status until it reaches terminate state. + + :param variables: Variables passed to the pipeline. + :param py_file: Path to the python file to execute. + :param py_options: Additional options. + :param py_interpreter: Python version of the Apache Beam pipeline. If `None`, this defaults to the + python3. To track python versions supported by beam and related issues + check: https://issues.apache.org/jira/browse/BEAM-1251 + :param py_requirements: Additional python package(s) to install. + If a value is passed to this parameter, a new virtual environment has been created with + additional packages installed. + + You could also install the apache-beam package if it is not installed on your system, or you want + to use a different version. + :param py_system_site_packages: Whether to include system_site_packages in your virtualenv. + See virtualenv documentation for more information. + + This option is only relevant if the ``py_requirements`` parameter is not None. + :param runner: Runner on which pipeline will be run. By default, "DirectRunner" is being used. + Other possible options: DataflowRunner, SparkRunner, FlinkRunner, PortableRunner. + See: :class:`~providers.apache.beam.hooks.beam.BeamRunnerType` + See: https://beam.apache.org/documentation/runners/capability-matrix/ + """ + + def __init__( + self, + variables: dict, + py_file: str, + py_options: list[str] | None = None, + py_interpreter: str = "python3", + py_requirements: list[str] | None = None, + py_system_site_packages: bool = False, + runner: str = "DirectRunner", + ): + super().__init__() + self.variables = variables + self.py_file = py_file + self.py_options = py_options + self.py_interpreter = py_interpreter + self.py_requirements = py_requirements + self.py_system_site_packages = py_system_site_packages + self.runner = runner + + def serialize(self) -> tuple[str, dict[str, Any]]: + """Serializes BeamPipelineTrigger arguments and classpath.""" + return ( + "airflow.providers.apache.beam.triggers.beam.BeamPipelineTrigger", + { + "variables": self.variables, + "py_file": self.py_file, + "py_options": self.py_options, + "py_interpreter": self.py_interpreter, + "py_requirements": self.py_requirements, + "py_system_site_packages": self.py_system_site_packages, + "runner": self.runner, + }, + ) + + async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override] + """Gets current pipeline status and yields a TriggerEvent.""" + hook = self._get_async_hook() + while True: + try: + return_code = await hook.start_python_pipeline_async( + variables=self.variables, + py_file=self.py_file, + py_options=self.py_options, + py_interpreter=self.py_interpreter, + py_requirements=self.py_requirements, + py_system_site_packages=self.py_system_site_packages, + ) + if return_code == 0: + yield TriggerEvent( + { + "status": "success", + "message": "Pipeline has finished SUCCESSFULLY", + } + ) + return + else: + yield TriggerEvent({"status": "error", "message": "Operation failed"}) + return + + except Exception as e: + self.log.exception("Exception occurred while checking for pipeline state") + yield TriggerEvent({"status": "error", "message": str(e)}) + return + + def _get_async_hook(self) -> BeamAsyncHook: + return BeamAsyncHook(runner=self.runner) diff --git a/docs/apache-airflow-providers-apache-beam/operators.rst b/docs/apache-airflow-providers-apache-beam/operators.rst index fedf6b0265..f7487fb264 100644 --- a/docs/apache-airflow-providers-apache-beam/operators.rst +++ b/docs/apache-airflow-providers-apache-beam/operators.rst @@ -61,6 +61,23 @@ Python Pipelines with DirectRunner :start-after: [START howto_operator_start_python_direct_runner_pipeline_gcs_file] :end-before: [END howto_operator_start_python_direct_runner_pipeline_gcs_file] +You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a +possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. +As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a +lot less resources wasted on idle Operators or Sensors: + +.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python_async.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_python_direct_runner_pipeline_local_file_async] + :end-before: [END howto_operator_start_python_direct_runner_pipeline_local_file_async] + +.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python_async.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_python_direct_runner_pipeline_gcs_file_async] + :end-before: [END howto_operator_start_python_direct_runner_pipeline_gcs_file_async] + Python Pipelines with DataflowRunner ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -76,6 +93,18 @@ Python Pipelines with DataflowRunner :start-after: [START howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file] :end-before: [END howto_operator_start_python_dataflow_runner_pipeline_async_gcs_file] + +You can use deferrable mode for this action in order to run the operator asynchronously. It will give you a +possibility to free up the worker when it knows it has to wait, and hand off the job of resuming Operator to a Trigger. +As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a +lot less resources wasted on idle Operators or Sensors: + +.. exampleinclude:: /../../tests/system/providers/apache/beam/example_python_async.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async] + :end-before: [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async] + | | diff --git a/tests/providers/apache/beam/hooks/test_beam.py b/tests/providers/apache/beam/hooks/test_beam.py index 3e3e895cf9..1cfe71121c 100644 --- a/tests/providers/apache/beam/hooks/test_beam.py +++ b/tests/providers/apache/beam/hooks/test_beam.py @@ -21,12 +21,17 @@ import os import re import subprocess from unittest import mock -from unittest.mock import ANY, MagicMock +from unittest.mock import ANY, AsyncMock, MagicMock import pytest from airflow.exceptions import AirflowException -from airflow.providers.apache.beam.hooks.beam import BeamHook, beam_options_to_args, run_beam_command +from airflow.providers.apache.beam.hooks.beam import ( + BeamAsyncHook, + BeamHook, + beam_options_to_args, + run_beam_command, +) PY_FILE = "apache_beam.examples.wordcount" JAR_FILE = "unitest.jar" @@ -391,3 +396,171 @@ class TestBeamOptionsToArgs: def test_beam_options_to_args(self, options, expected_args): args = beam_options_to_args(options) assert args == expected_args + + +class TestBeamAsyncHook: + @pytest.mark.asyncio + @mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async") + @mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir") + async def test_start_python_pipeline(self, mock_create_dir, mock_runner): + hook = BeamAsyncHook(runner=DEFAULT_RUNNER) + mock_create_dir.return_value = AsyncMock() + mock_runner.return_value = 0 + + await hook.start_python_pipeline_async( + variables=copy.deepcopy(BEAM_VARIABLES_PY), + py_file=PY_FILE, + py_options=PY_OPTIONS, + ) + + expected_cmd = [ + "python3", + "-m", + PY_FILE, + f"--runner={DEFAULT_RUNNER}", + "--output=gs://test/output", + "--labels=foo=bar", + ] + mock_create_dir.assert_called_once() + mock_runner.assert_called_once_with( + cmd=expected_cmd, + working_directory=None, + log=ANY, + ) + + @pytest.mark.asyncio + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.35.0") + async def test_start_python_pipeline_unsupported_option(self, mock_check_output): + hook = BeamAsyncHook(runner=DEFAULT_RUNNER) + + with pytest.raises( + AirflowException, + match=re.escape("The impersonateServiceAccount option requires Apache Beam 2.39.0 or newer."), + ): + await hook.start_python_pipeline_async( + variables={ + "impersonate_service_account": "t...@impersonation.com", + }, + py_file="/tmp/file.py", + py_options=["-m"], + py_interpreter="python3", + py_requirements=None, + py_system_site_packages=False, + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "py_interpreter", + [ + pytest.param("python", id="default python"), + pytest.param("python2", id="major python version 2.x"), + pytest.param("python3", id="major python version 3.x"), + pytest.param("python3.6", id="major.minor python version"), + ], + ) + @mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async") + @mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir") + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + async def test_start_python_pipeline_with_custom_interpreter( + self, mock_check_output, mock_create_dir, mock_runner, py_interpreter + ): + hook = BeamAsyncHook(runner=DEFAULT_RUNNER) + mock_create_dir.return_value = AsyncMock() + mock_runner.return_value = 0 + + await hook.start_python_pipeline_async( + variables=copy.deepcopy(BEAM_VARIABLES_PY), + py_file=PY_FILE, + py_options=PY_OPTIONS, + py_interpreter=py_interpreter, + ) + + expected_cmd = [ + py_interpreter, + "-m", + PY_FILE, + f"--runner={DEFAULT_RUNNER}", + "--output=gs://test/output", + "--labels=foo=bar", + ] + mock_runner.assert_called_once_with( + cmd=expected_cmd, + working_directory=None, + log=ANY, + ) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "current_py_requirements, current_py_system_site_packages", + [ + pytest.param("foo-bar", False, id="requirements without system site-packages"), + pytest.param("foo-bar", True, id="requirements with system site-packages"), + pytest.param([], True, id="only system site-packages"), + ], + ) + @mock.patch(BEAM_STRING.format("prepare_virtualenv")) + @mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.run_beam_command_async") + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + @mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._create_tmp_dir") + @mock.patch("airflow.providers.apache.beam.hooks.beam.BeamAsyncHook._cleanup_tmp_dir") + async def test_start_python_pipeline_with_non_empty_py_requirements_and_without_system_packages( + self, + mock_cleanup_dir, + mock_create_dir, + mock_check_output, + mock_runner, + mock_virtualenv, + current_py_requirements, + current_py_system_site_packages, + ): + hook = BeamAsyncHook(runner=DEFAULT_RUNNER) + mock_create_dir.return_value = AsyncMock() + mock_virtualenv.return_value = "/dummy_dir/bin/python" + mock_cleanup_dir.return_value = AsyncMock() + + await hook.start_python_pipeline_async( + variables=copy.deepcopy(BEAM_VARIABLES_PY), + py_file=PY_FILE, + py_options=PY_OPTIONS, + py_requirements=current_py_requirements, + py_system_site_packages=current_py_system_site_packages, + ) + + expected_cmd = [ + "/dummy_dir/bin/python", + "-m", + PY_FILE, + f"--runner={DEFAULT_RUNNER}", + "--output=gs://test/output", + "--labels=foo=bar", + ] + mock_runner.assert_called_once_with( + cmd=expected_cmd, + working_directory=None, + log=ANY, + ) + mock_virtualenv.assert_called_once_with( + venv_directory=mock.ANY, + python_bin="python3", + system_site_packages=current_py_system_site_packages, + requirements=current_py_requirements, + ) + mock_create_dir.assert_called_once() + + @pytest.mark.asyncio + @mock.patch(BEAM_STRING.format("run_beam_command")) + @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.39.0") + async def test_start_python_pipeline_with_empty_py_requirements_and_without_system_packages( + self, mock_check_output, mock_runner + ): + hook = BeamAsyncHook(runner=DEFAULT_RUNNER) + + with pytest.raises(AirflowException, match=r"Invalid method invocation\."): + await hook.start_python_pipeline_async( + variables=copy.deepcopy(BEAM_VARIABLES_PY), + py_file=PY_FILE, + py_options=PY_OPTIONS, + py_requirements=[], + ) + + mock_runner.assert_not_called() diff --git a/tests/providers/apache/beam/operators/test_beam.py b/tests/providers/apache/beam/operators/test_beam.py index 9bdf016f48..c5949dd868 100644 --- a/tests/providers/apache/beam/operators/test_beam.py +++ b/tests/providers/apache/beam/operators/test_beam.py @@ -22,11 +22,13 @@ from unittest.mock import MagicMock, call import pytest +from airflow.exceptions import AirflowException, TaskDeferred from airflow.providers.apache.beam.operators.beam import ( BeamRunGoPipelineOperator, BeamRunJavaPipelineOperator, BeamRunPythonPipelineOperator, ) +from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger from airflow.providers.google.cloud.operators.dataflow import DataflowConfiguration from airflow.version import version @@ -150,13 +152,6 @@ class TestBeamRunPythonPipelineOperator: py_system_site_packages=False, process_line_callback=mock.ANY, ) - dataflow_hook_mock.return_value.wait_for_done.assert_called_once_with( - job_id=self.operator.dataflow_job_id, - job_name=job_name, - location="us-central1", - multiple_jobs=False, - project_id=dataflow_config.project_id, - ) dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with() @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist") @@ -268,13 +263,6 @@ class TestBeamRunJavaPipelineOperator: job_class=JOB_CLASS, process_line_callback=mock.ANY, ) - dataflow_hook_mock.return_value.wait_for_done.assert_called_once_with( - job_id=self.operator.dataflow_job_id, - job_name=job_name, - location="us-central1", - multiple_jobs=False, - project_id=dataflow_hook_mock.return_value.project_id, - ) @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist") @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook") @@ -696,3 +684,142 @@ class TestBeamRunGoPipelineOperator: self.operator.execute(None) self.operator.on_kill() dataflow_cancel_job.assert_not_called() + + +class TestBeamRunPythonPipelineOperatorAsync: + def setup_method(self): + self.operator = BeamRunPythonPipelineOperator( + task_id=TASK_ID, + py_file=PY_FILE, + py_options=PY_OPTIONS, + default_pipeline_options=DEFAULT_OPTIONS, + pipeline_options=ADDITIONAL_OPTIONS, + deferrable=True, + ) + + def test_init(self): + """Test BeamRunPythonPipelineOperator instance is properly initialized.""" + assert self.operator.task_id == TASK_ID + assert self.operator.py_file == PY_FILE + assert self.operator.runner == DEFAULT_RUNNER + assert self.operator.py_options == PY_OPTIONS + assert self.operator.py_interpreter == PY_INTERPRETER + assert self.operator.default_pipeline_options == DEFAULT_OPTIONS + assert self.operator.pipeline_options == EXPECTED_ADDITIONAL_OPTIONS + + @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook") + @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook") + def test_async_execute_should_execute_successfully(self, gcs_hook, beam_hook_mock): + """ + Asserts that a task is deferred and the BeamPipelineTrigger will be fired + when the BeamRunPythonPipelineOperator is executed in deferrable mode when deferrable=True. + """ + with pytest.raises(TaskDeferred) as exc: + self.operator.execute(context=mock.MagicMock()) + + assert isinstance(exc.value.trigger, BeamPipelineTrigger), "Trigger is not a BeamPipelineTrigger" + + def test_async_execute_should_throw_exception(self): + """Tests that an AirflowException is raised in case of error event""" + + with pytest.raises(AirflowException): + self.operator.execute_complete( + context=mock.MagicMock(), event={"status": "error", "message": "test failure message"} + ) + + def test_async_execute_logging_should_execute_successfully(self): + """Asserts that logging occurs as expected""" + + with mock.patch.object(self.operator.log, "info") as mock_log_info: + self.operator.execute_complete( + context=mock.MagicMock(), + event={"status": "success", "message": "Pipeline has finished SUCCESSFULLY"}, + ) + mock_log_info.assert_called_with( + "%s completed with response %s ", TASK_ID, "Pipeline has finished SUCCESSFULLY" + ) + + @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook") + @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook") + def test_async_execute_direct_runner(self, gcs_hook, beam_hook_mock): + """ + Test BeamHook is created and the right args are passed to + start_python_workflow when executing direct runner. + """ + gcs_provide_file = gcs_hook.return_value.provide_file + with pytest.raises(TaskDeferred): + self.operator.execute(context=mock.MagicMock()) + beam_hook_mock.assert_called_once_with(runner=DEFAULT_RUNNER) + gcs_provide_file.assert_called_once_with(object_url=PY_FILE) + + @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist") + @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook") + @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook") + @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook") + def test_exec_dataflow_runner(self, gcs_hook, dataflow_hook_mock, beam_hook_mock, persist_link_mock): + """ + Test DataflowHook is created and the right args are passed to + start_python_dataflow when executing Dataflow runner. + """ + + dataflow_config = DataflowConfiguration(impersonation_chain=TEST_IMPERSONATION_ACCOUNT) + self.operator.runner = "DataflowRunner" + self.operator.dataflow_config = dataflow_config + gcs_provide_file = gcs_hook.return_value.provide_file + magic_mock = mock.MagicMock() + with pytest.raises(TaskDeferred): + self.operator.execute(context=magic_mock) + + job_name = dataflow_hook_mock.build_dataflow_job_name.return_value + dataflow_hook_mock.assert_called_once_with( + gcp_conn_id=dataflow_config.gcp_conn_id, + poll_sleep=dataflow_config.poll_sleep, + impersonation_chain=dataflow_config.impersonation_chain, + drain_pipeline=dataflow_config.drain_pipeline, + cancel_timeout=dataflow_config.cancel_timeout, + wait_until_finished=dataflow_config.wait_until_finished, + ) + expected_options = { + "project": dataflow_hook_mock.return_value.project_id, + "job_name": job_name, + "staging_location": "gs://test/staging", + "output": "gs://test/output", + "labels": {"foo": "bar", "airflow-version": TEST_VERSION}, + "region": "us-central1", + "impersonate_service_account": TEST_IMPERSONATION_ACCOUNT, + } + gcs_provide_file.assert_called_once_with(object_url=PY_FILE) + persist_link_mock.assert_called_once_with( + self.operator, + magic_mock, + expected_options["project"], + expected_options["region"], + self.operator.dataflow_job_id, + ) + beam_hook_mock.return_value.start_python_pipeline.assert_not_called() + dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with() + + @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowJobLink.persist") + @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook") + @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook") + @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook") + def test_on_kill_dataflow_runner(self, dataflow_hook_mock, _, __, ___): + self.operator.runner = "DataflowRunner" + dataflow_cancel_job = dataflow_hook_mock.return_value.cancel_job + with pytest.raises(TaskDeferred): + self.operator.execute(context=mock.MagicMock()) + self.operator.dataflow_job_id = JOB_ID + self.operator.on_kill() + dataflow_cancel_job.assert_called_once_with( + job_id=JOB_ID, project_id=self.operator.dataflow_config.project_id + ) + + @mock.patch("airflow.providers.apache.beam.operators.beam.BeamHook") + @mock.patch("airflow.providers.apache.beam.operators.beam.DataflowHook") + @mock.patch("airflow.providers.apache.beam.operators.beam.GCSHook") + def test_on_kill_direct_runner(self, _, dataflow_mock, __): + dataflow_cancel_job = dataflow_mock.return_value.cancel_job + with pytest.raises(TaskDeferred): + self.operator.execute(mock.MagicMock()) + self.operator.on_kill() + dataflow_cancel_job.assert_not_called() diff --git a/tests/providers/apache/beam/triggers/__init__.py b/tests/providers/apache/beam/triggers/__init__.py new file mode 100644 index 0000000000..13a83393a9 --- /dev/null +++ b/tests/providers/apache/beam/triggers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/apache/beam/triggers/test_beam.py b/tests/providers/apache/beam/triggers/test_beam.py new file mode 100644 index 0000000000..ce22dda215 --- /dev/null +++ b/tests/providers/apache/beam/triggers/test_beam.py @@ -0,0 +1,107 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.providers.apache.beam.triggers.beam import BeamPipelineTrigger +from airflow.triggers.base import TriggerEvent + +HOOK_STATUS_STR = "airflow.providers.apache.beam.hooks.beam.BeamAsyncHook.start_python_pipeline_async" +CLASSPATH = "airflow.providers.apache.beam.triggers.beam.BeamPipelineTrigger" + +TASK_ID = "test_task" +LOCATION = "test-location" +INSTANCE_NAME = "airflow-test-instance" +INSTANCE = {"type": "BASIC", "displayName": INSTANCE_NAME} +PROJECT_ID = "test_project_id" +TEST_VARIABLES = {"output": "gs://bucket_test/output", "labels": {"airflow-version": "v2-7-0-dev0"}} +TEST_PY_FILE = "apache_beam.examples.wordcount" +TEST_PY_OPTIONS = [] +TEST_PY_INTERPRETER = "python3" +TEST_PY_REQUIREMENTS = ["apache-beam[gcp]==2.46.0"] +TEST_PY_PACKAGES = False +TEST_RUNNER = "DirectRunner" + + +@pytest.fixture +def trigger(): + return BeamPipelineTrigger( + variables=TEST_VARIABLES, + py_file=TEST_PY_FILE, + py_options=TEST_PY_OPTIONS, + py_interpreter=TEST_PY_INTERPRETER, + py_requirements=TEST_PY_REQUIREMENTS, + py_system_site_packages=TEST_PY_PACKAGES, + runner=TEST_RUNNER, + ) + + +class TestBeamPipelineTrigger: + def test_beam_trigger_serialization_should_execute_successfully(self, trigger): + """ + Asserts that the BeamPipelineTrigger correctly serializes its arguments + and classpath. + """ + classpath, kwargs = trigger.serialize() + assert classpath == CLASSPATH + assert kwargs == { + "variables": TEST_VARIABLES, + "py_file": TEST_PY_FILE, + "py_options": TEST_PY_OPTIONS, + "py_interpreter": TEST_PY_INTERPRETER, + "py_requirements": TEST_PY_REQUIREMENTS, + "py_system_site_packages": TEST_PY_PACKAGES, + "runner": TEST_RUNNER, + } + + @pytest.mark.asyncio + @mock.patch(HOOK_STATUS_STR) + async def test_beam_trigger_on_success_should_execute_successfully(self, mock_pipeline_status, trigger): + """ + Tests the BeamPipelineTrigger only fires once the job execution reaches a successful state. + """ + mock_pipeline_status.return_value = 0 + generator = trigger.run() + actual = await generator.asend(None) + assert TriggerEvent({"status": "success", "message": "Pipeline has finished SUCCESSFULLY"}) == actual + + @pytest.mark.asyncio + @mock.patch(HOOK_STATUS_STR) + async def test_beam_trigger_error_should_execute_successfully(self, mock_pipeline_status, trigger): + """ + Test that BeamPipelineTrigger fires the correct event in case of an error. + """ + mock_pipeline_status.return_value = 1 + + generator = trigger.run() + actual = await generator.asend(None) + assert TriggerEvent({"status": "error", "message": "Operation failed"}) == actual + + @pytest.mark.asyncio + @mock.patch(HOOK_STATUS_STR) + async def test_beam_trigger_exception_should_execute_successfully(self, mock_pipeline_status, trigger): + """ + Test that BeamPipelineTrigger fires the correct event in case of an error. + """ + mock_pipeline_status.side_effect = Exception("Test exception") + + generator = trigger.run() + actual = await generator.asend(None) + assert TriggerEvent({"status": "error", "message": "Test exception"}) == actual diff --git a/tests/system/providers/apache/beam/example_python.py b/tests/system/providers/apache/beam/example_python.py index 50c469c73d..929844ec54 100644 --- a/tests/system/providers/apache/beam/example_python.py +++ b/tests/system/providers/apache/beam/example_python.py @@ -47,7 +47,7 @@ with models.DAG( task_id="start_python_pipeline_local_direct_runner", py_file="apache_beam.examples.wordcount", py_options=["-m"], - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, ) @@ -59,7 +59,7 @@ with models.DAG( py_file=GCS_PYTHON, py_options=[], pipeline_options={"output": GCS_OUTPUT}, - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, ) @@ -76,7 +76,7 @@ with models.DAG( "output": GCS_OUTPUT, }, py_options=[], - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, dataflow_config=DataflowConfiguration( @@ -90,7 +90,7 @@ with models.DAG( py_file="apache_beam.examples.wordcount", runner="SparkRunner", py_options=["-m"], - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, ) @@ -103,7 +103,7 @@ with models.DAG( pipeline_options={ "output": "/tmp/start_python_pipeline_local_flink_runner", }, - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, ) @@ -113,6 +113,7 @@ with models.DAG( start_python_pipeline_local_direct_runner, start_python_pipeline_direct_runner, ] + >> start_python_pipeline_dataflow_runner >> start_python_pipeline_local_flink_runner >> start_python_pipeline_local_spark_runner ) diff --git a/tests/system/providers/apache/beam/example_python.py b/tests/system/providers/apache/beam/example_python_async.py similarity index 82% copy from tests/system/providers/apache/beam/example_python.py copy to tests/system/providers/apache/beam/example_python_async.py index 50c469c73d..520e8adc4c 100644 --- a/tests/system/providers/apache/beam/example_python.py +++ b/tests/system/providers/apache/beam/example_python_async.py @@ -34,7 +34,7 @@ from tests.system.providers.apache.beam.utils import ( ) with models.DAG( - "example_beam_native_python", + dag_id="dataflow_native_python_async", start_date=START_DATE, schedule=None, # Override to match your needs catchup=False, @@ -42,30 +42,32 @@ with models.DAG( tags=["example"], ) as dag: - # [START howto_operator_start_python_direct_runner_pipeline_local_file] + # [START howto_operator_start_python_direct_runner_pipeline_local_file_async] start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator( task_id="start_python_pipeline_local_direct_runner", py_file="apache_beam.examples.wordcount", py_options=["-m"], - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, + deferrable=True, ) - # [END howto_operator_start_python_direct_runner_pipeline_local_file] + # [END howto_operator_start_python_direct_runner_pipeline_local_file_async] - # [START howto_operator_start_python_direct_runner_pipeline_gcs_file] + # [START howto_operator_start_python_direct_runner_pipeline_gcs_file_async] start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator( task_id="start_python_pipeline_direct_runner", py_file=GCS_PYTHON, py_options=[], pipeline_options={"output": GCS_OUTPUT}, - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, + deferrable=True, ) - # [END howto_operator_start_python_direct_runner_pipeline_gcs_file] + # [END howto_operator_start_python_direct_runner_pipeline_gcs_file_async] - # [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file] + # [START howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async] start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator( task_id="start_python_pipeline_dataflow_runner", runner="DataflowRunner", @@ -76,25 +78,30 @@ with models.DAG( "output": GCS_OUTPUT, }, py_options=[], - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, dataflow_config=DataflowConfiguration( job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1" ), + deferrable=True, ) - # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file] + # [END howto_operator_start_python_dataflow_runner_pipeline_gcs_file_async] + # [START howto_operator_start_python_pipeline_local_runner_spark_runner_async] start_python_pipeline_local_spark_runner = BeamRunPythonPipelineOperator( task_id="start_python_pipeline_local_spark_runner", py_file="apache_beam.examples.wordcount", runner="SparkRunner", py_options=["-m"], - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, + deferrable=True, ) + # [END howto_operator_start_python_pipeline_local_runner_spark_runner_async] + # [START howto_operator_start_python_pipeline_local_runner_flink_runner_async] start_python_pipeline_local_flink_runner = BeamRunPythonPipelineOperator( task_id="start_python_pipeline_local_flink_runner", py_file="apache_beam.examples.wordcount", @@ -103,16 +110,19 @@ with models.DAG( pipeline_options={ "output": "/tmp/start_python_pipeline_local_flink_runner", }, - py_requirements=["apache-beam[gcp]==2.26.0"], + py_requirements=["apache-beam[gcp]==2.46.0"], py_interpreter="python3", py_system_site_packages=False, + deferrable=True, ) + # [END howto_operator_start_python_pipeline_local_runner_flink_runner_async] ( [ start_python_pipeline_local_direct_runner, start_python_pipeline_direct_runner, ] + >> start_python_pipeline_dataflow_runner >> start_python_pipeline_local_flink_runner >> start_python_pipeline_local_spark_runner )