This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new d793fda391 Add ability to run streaming Job for BeamRunPythonPipelineOperator in non deferrable mode (#36108) d793fda391 is described below commit d793fda39161be9281d3d4da54e2e2b4f6344b4d Author: Maksim <maks...@google.com> AuthorDate: Mon Dec 18 01:15:23 2023 +0100 Add ability to run streaming Job for BeamRunPythonPipelineOperator in non deferrable mode (#36108) --- airflow/providers/apache/beam/hooks/beam.py | 14 ++- airflow/providers/apache/beam/operators/beam.py | 48 +++++++- airflow/providers/google/cloud/hooks/dataflow.py | 18 +++ .../providers/google/cloud/operators/dataflow.py | 6 + .../operators/cloud/dataflow.rst | 18 ++- tests/providers/apache/beam/hooks/test_beam.py | 21 +++- tests/providers/apache/beam/operators/test_beam.py | 1 + .../dataflow/example_dataflow_streaming_python.py | 126 +++++++++++++++++++++ 8 files changed, 237 insertions(+), 15 deletions(-) diff --git a/airflow/providers/apache/beam/hooks/beam.py b/airflow/providers/apache/beam/hooks/beam.py index c27cc41710..efea53560b 100644 --- a/airflow/providers/apache/beam/hooks/beam.py +++ b/airflow/providers/apache/beam/hooks/beam.py @@ -90,6 +90,7 @@ def process_fd( fd, log: logging.Logger, process_line_callback: Callable[[str], None] | None = None, + check_job_status_callback: Callable[[], bool | None] | None = None, ): """ Print output to logs. @@ -111,6 +112,8 @@ def process_fd( if process_line_callback: process_line_callback(line) func_log(line.rstrip("\n")) + if check_job_status_callback and check_job_status_callback(): + return def run_beam_command( @@ -118,6 +121,7 @@ def run_beam_command( log: logging.Logger, process_line_callback: Callable[[str], None] | None = None, working_directory: str | None = None, + check_job_status_callback: Callable[[], bool | None] | None = None, ) -> None: """ Run pipeline command in subprocess. @@ -149,14 +153,16 @@ def run_beam_command( continue for readable_fd in readable_fds: - process_fd(proc, readable_fd, log, process_line_callback) + process_fd(proc, readable_fd, log, process_line_callback, check_job_status_callback) + if check_job_status_callback and check_job_status_callback(): + return if proc.poll() is not None: break # Corner case: check if more output was created between the last read and the process termination for readable_fd in reads: - process_fd(proc, readable_fd, log, process_line_callback) + process_fd(proc, readable_fd, log, process_line_callback, check_job_status_callback) log.info("Process exited with return code: %s", proc.returncode) @@ -187,6 +193,7 @@ class BeamHook(BaseHook): command_prefix: list[str], process_line_callback: Callable[[str], None] | None = None, working_directory: str | None = None, + check_job_status_callback: Callable[[], bool | None] | None = None, ) -> None: cmd = [*command_prefix, f"--runner={self.runner}"] if variables: @@ -196,6 +203,7 @@ class BeamHook(BaseHook): process_line_callback=process_line_callback, working_directory=working_directory, log=self.log, + check_job_status_callback=check_job_status_callback, ) def start_python_pipeline( @@ -207,6 +215,7 @@ class BeamHook(BaseHook): py_requirements: list[str] | None = None, py_system_site_packages: bool = False, process_line_callback: Callable[[str], None] | None = None, + check_job_status_callback: Callable[[], bool | None] | None = None, ): """ Start Apache Beam python pipeline. @@ -279,6 +288,7 @@ class BeamHook(BaseHook): variables=variables, command_prefix=command_prefix, process_line_callback=process_line_callback, + check_job_status_callback=check_job_status_callback, ) def start_java_pipeline( diff --git a/airflow/providers/apache/beam/operators/beam.py b/airflow/providers/apache/beam/operators/beam.py index 943fc5b697..876f47fa2d 100644 --- a/airflow/providers/apache/beam/operators/beam.py +++ b/airflow/providers/apache/beam/operators/beam.py @@ -67,7 +67,7 @@ class BeamDataflowMixin(metaclass=ABCMeta): self, pipeline_options: dict, job_name_variable_key: str | None = None, - ) -> tuple[str, dict, Callable[[str], None]]: + ) -> tuple[str, dict, Callable[[str], None], Callable[[], bool | None]]: self.dataflow_hook = self.__set_dataflow_hook() self.dataflow_config.project_id = self.dataflow_config.project_id or self.dataflow_hook.project_id dataflow_job_name = self.__get_dataflow_job_name() @@ -75,7 +75,8 @@ class BeamDataflowMixin(metaclass=ABCMeta): pipeline_options, dataflow_job_name, job_name_variable_key ) process_line_callback = self.__get_dataflow_process_callback() - return dataflow_job_name, pipeline_options, process_line_callback + check_job_status_callback = self.__check_dataflow_job_status_callback() + return dataflow_job_name, pipeline_options, process_line_callback, check_job_status_callback def __set_dataflow_hook(self) -> DataflowHook: self.dataflow_hook = DataflowHook( @@ -123,6 +124,19 @@ class BeamDataflowMixin(metaclass=ABCMeta): on_new_job_id_callback=set_current_dataflow_job_id ) + def __check_dataflow_job_status_callback(self) -> Callable[[], bool | None]: + def check_dataflow_job_status() -> bool | None: + if self.dataflow_job_id and self.dataflow_hook: + return self.dataflow_hook.is_job_done( + location=self.dataflow_config.location, + project_id=self.dataflow_config.project_id, + job_id=self.dataflow_job_id, + ) + else: + return None + + return check_dataflow_job_status + class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC): """ @@ -184,14 +198,20 @@ class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC): self, format_pipeline_options: bool = False, job_name_variable_key: str | None = None, - ) -> tuple[bool, str | None, dict, Callable[[str], None] | None]: + ) -> tuple[bool, str | None, dict, Callable[[str], None] | None, Callable[[], bool | None] | None]: self.beam_hook = BeamHook(runner=self.runner) pipeline_options = self.default_pipeline_options.copy() process_line_callback: Callable[[str], None] | None = None + check_job_status_callback: Callable[[], bool | None] | None = None is_dataflow = self.runner.lower() == BeamRunnerType.DataflowRunner.lower() dataflow_job_name: str | None = None if is_dataflow: - dataflow_job_name, pipeline_options, process_line_callback = self._set_dataflow( + ( + dataflow_job_name, + pipeline_options, + process_line_callback, + check_job_status_callback, + ) = self._set_dataflow( pipeline_options=pipeline_options, job_name_variable_key=job_name_variable_key, ) @@ -203,9 +223,21 @@ class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC): snake_case_pipeline_options = { convert_camel_to_snake(key): pipeline_options[key] for key in pipeline_options } - return is_dataflow, dataflow_job_name, snake_case_pipeline_options, process_line_callback + return ( + is_dataflow, + dataflow_job_name, + snake_case_pipeline_options, + process_line_callback, + check_job_status_callback, + ) - return is_dataflow, dataflow_job_name, pipeline_options, process_line_callback + return ( + is_dataflow, + dataflow_job_name, + pipeline_options, + process_line_callback, + check_job_status_callback, + ) class BeamRunPythonPipelineOperator(BeamBasePipelineOperator): @@ -297,6 +329,7 @@ class BeamRunPythonPipelineOperator(BeamBasePipelineOperator): self.dataflow_job_name, self.snake_case_pipeline_options, self.process_line_callback, + self.check_job_status_callback, ) = self._init_pipeline_options(format_pipeline_options=True, job_name_variable_key="job_name") if not self.beam_hook: raise AirflowException("Beam hook is not defined.") @@ -329,6 +362,7 @@ class BeamRunPythonPipelineOperator(BeamBasePipelineOperator): py_requirements=self.py_requirements, py_system_site_packages=self.py_system_site_packages, process_line_callback=self.process_line_callback, + check_job_status_callback=self.check_job_status_callback, ) DataflowJobLink.persist( self, @@ -495,6 +529,7 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator): dataflow_job_name, pipeline_options, process_line_callback, + _, ) = self._init_pipeline_options() if not self.beam_hook: @@ -668,6 +703,7 @@ class BeamRunGoPipelineOperator(BeamBasePipelineOperator): dataflow_job_name, snake_case_pipeline_options, process_line_callback, + _, ) = self._init_pipeline_options(format_pipeline_options=True, job_name_variable_key="job_name") if not self.beam_hook: diff --git a/airflow/providers/google/cloud/hooks/dataflow.py b/airflow/providers/google/cloud/hooks/dataflow.py index 1769839a0b..6c051ce619 100644 --- a/airflow/providers/google/cloud/hooks/dataflow.py +++ b/airflow/providers/google/cloud/hooks/dataflow.py @@ -1203,6 +1203,24 @@ class DataflowHook(GoogleBaseHook): ) job_controller.wait_for_done() + @GoogleBaseHook.fallback_to_default_project_id + def is_job_done(self, location: str, project_id: str, job_id: str) -> bool: + """ + Check that Dataflow job is started(for streaming job) or finished(for batch job). + + :param location: location the job is running + :param project_id: Google Cloud project ID in which to start a job + :param job_id: Dataflow job ID + """ + job_controller = _DataflowJobsController( + dataflow=self.get_conn(), + project_number=project_id, + location=location, + ) + job = job_controller.fetch_job_by_id(job_id) + + return job_controller._check_dataflow_job_state(job) + class AsyncDataflowHook(GoogleBaseAsyncHook): """Async hook class for dataflow service.""" diff --git a/airflow/providers/google/cloud/operators/dataflow.py b/airflow/providers/google/cloud/operators/dataflow.py index 59b2715886..ec813c55eb 100644 --- a/airflow/providers/google/cloud/operators/dataflow.py +++ b/airflow/providers/google/cloud/operators/dataflow.py @@ -1286,6 +1286,12 @@ class DataflowStopJobOperator(GoogleCloudBaseOperator): :param stop_timeout: wait time in seconds for successful job canceling/draining """ + template_fields = [ + "job_id", + "project_id", + "impersonation_chain", + ] + def __init__( self, job_name_prefix: str | None = None, diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst index 366f14f872..3851cb8fa8 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst @@ -65,9 +65,9 @@ Starting Non-templated pipeline To create a new pipeline using the source file (JAR in Java or Python file) use the create job operators. The source file can be located on GCS or on the local filesystem. -:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator` +:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator` or -:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator` +:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator` .. _howto/operator:DataflowCreateJavaJobOperator: @@ -75,7 +75,7 @@ Java SDK pipelines """""""""""""""""" For Java pipeline the ``jar`` argument must be specified for -:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator` +:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator` as it contains the pipeline to be executed on Dataflow. The JAR can be available on GCS that Airflow has the ability to download or available on the local filesystem (provide the absolute path to it). @@ -101,7 +101,7 @@ Python SDK pipelines """""""""""""""""""" The ``py_file`` argument must be specified for -:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator` +:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator` as it contains the pipeline to be executed on Dataflow. The Python file can be available on GCS that Airflow has the ability to download or available on the local filesystem (provide the absolute path to it). @@ -129,8 +129,8 @@ Dataflow has multiple options of executing pipelines. It can be done in the foll batch asynchronously (fire and forget), batch blocking (wait until completion), or streaming (run indefinitely). In Airflow it is best practice to use asynchronous batch pipelines or streams and use sensors to listen for expected job state. -By default :class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`, -:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`, +By default :class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`, +:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`, :class:`~airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator` and :class:`~airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator` have argument ``wait_until_finished`` set to ``None`` which cause different behaviour depends on the type of pipeline: @@ -175,6 +175,12 @@ Streaming execution To execute a streaming Dataflow job, ensure the streaming option is set (for Python) or read from an unbounded data source, such as Pub/Sub, in your pipeline (for Java). +.. exampleinclude:: /../../tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_streaming_python_job] + :end-before: [END howto_operator_start_streaming_python_job] + Setting argument ``drain_pipeline`` to ``True`` allows to stop streaming job by draining it instead of canceling during killing task instance. diff --git a/tests/providers/apache/beam/hooks/test_beam.py b/tests/providers/apache/beam/hooks/test_beam.py index 1cfe71121c..ce33b683e2 100644 --- a/tests/providers/apache/beam/hooks/test_beam.py +++ b/tests/providers/apache/beam/hooks/test_beam.py @@ -67,12 +67,14 @@ class TestBeamHook: def test_start_python_pipeline(self, mock_check_output, mock_runner): hook = BeamHook(runner=DEFAULT_RUNNER) process_line_callback = MagicMock() + check_job_status_callback = MagicMock() hook.start_python_pipeline( variables=copy.deepcopy(BEAM_VARIABLES_PY), py_file=PY_FILE, py_options=PY_OPTIONS, process_line_callback=process_line_callback, + check_job_status_callback=check_job_status_callback, ) expected_cmd = [ @@ -88,6 +90,7 @@ class TestBeamHook: process_line_callback=process_line_callback, working_directory=None, log=ANY, + check_job_status_callback=check_job_status_callback, ) @mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", return_value=b"2.35.0") @@ -108,6 +111,7 @@ class TestBeamHook: py_requirements=None, py_system_site_packages=False, process_line_callback=MagicMock(), + check_job_status_callback=MagicMock(), ) @pytest.mark.parametrize( @@ -126,6 +130,7 @@ class TestBeamHook: ): hook = BeamHook(runner=DEFAULT_RUNNER) process_line_callback = MagicMock() + check_job_status_callback = MagicMock() hook.start_python_pipeline( variables=copy.deepcopy(BEAM_VARIABLES_PY), @@ -133,6 +138,7 @@ class TestBeamHook: py_options=PY_OPTIONS, py_interpreter=py_interpreter, process_line_callback=process_line_callback, + check_job_status_callback=check_job_status_callback, ) expected_cmd = [ @@ -148,6 +154,7 @@ class TestBeamHook: process_line_callback=process_line_callback, working_directory=None, log=ANY, + check_job_status_callback=check_job_status_callback, ) @pytest.mark.parametrize( @@ -172,6 +179,7 @@ class TestBeamHook: hook = BeamHook(runner=DEFAULT_RUNNER) mock_virtualenv.return_value = "/dummy_dir/bin/python" process_line_callback = MagicMock() + check_job_status_callback = MagicMock() hook.start_python_pipeline( variables=copy.deepcopy(BEAM_VARIABLES_PY), @@ -180,6 +188,7 @@ class TestBeamHook: py_requirements=current_py_requirements, py_system_site_packages=current_py_system_site_packages, process_line_callback=process_line_callback, + check_job_status_callback=check_job_status_callback, ) expected_cmd = [ @@ -193,6 +202,7 @@ class TestBeamHook: mock_runner.assert_called_once_with( cmd=expected_cmd, process_line_callback=process_line_callback, + check_job_status_callback=check_job_status_callback, working_directory=None, log=ANY, ) @@ -211,6 +221,7 @@ class TestBeamHook: hook = BeamHook(runner=DEFAULT_RUNNER) wait_for_done = mock_runner.return_value.wait_for_done process_line_callback = MagicMock() + check_job_status_callback = MagicMock() with pytest.raises(AirflowException, match=r"Invalid method invocation\."): hook.start_python_pipeline( @@ -219,6 +230,7 @@ class TestBeamHook: py_options=PY_OPTIONS, py_requirements=[], process_line_callback=process_line_callback, + check_job_status_callback=check_job_status_callback, ) mock_runner.assert_not_called() @@ -244,7 +256,11 @@ class TestBeamHook: '--labels={"foo":"bar"}', ] mock_runner.assert_called_once_with( - cmd=expected_cmd, process_line_callback=process_line_callback, working_directory=None, log=ANY + cmd=expected_cmd, + process_line_callback=process_line_callback, + working_directory=None, + log=ANY, + check_job_status_callback=None, ) @mock.patch(BEAM_STRING.format("run_beam_command")) @@ -273,6 +289,7 @@ class TestBeamHook: process_line_callback=process_line_callback, working_directory=None, log=ANY, + check_job_status_callback=None, ) @mock.patch(BEAM_STRING.format("shutil.which")) @@ -303,6 +320,7 @@ class TestBeamHook: process_line_callback=process_line_callback, working_directory=go_workspace, log=ANY, + check_job_status_callback=None, ) @mock.patch(BEAM_STRING.format("shutil.which")) @@ -348,6 +366,7 @@ class TestBeamHook: process_line_callback=process_line_callback, working_directory=None, log=ANY, + check_job_status_callback=None, ) diff --git a/tests/providers/apache/beam/operators/test_beam.py b/tests/providers/apache/beam/operators/test_beam.py index d67b3ff647..8b6f57cccc 100644 --- a/tests/providers/apache/beam/operators/test_beam.py +++ b/tests/providers/apache/beam/operators/test_beam.py @@ -160,6 +160,7 @@ class TestBeamRunPythonPipelineOperator: py_requirements=None, py_system_site_packages=False, process_line_callback=mock.ANY, + check_job_status_callback=mock.ANY, ) dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with() diff --git a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py new file mode 100644 index 0000000000..0489117e5d --- /dev/null +++ b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py @@ -0,0 +1,126 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG for testing Google Dataflow Beam Pipeline Operator with Python for Streaming job. +""" +from __future__ import annotations + +import os +from datetime import datetime + +from airflow.models.dag import DAG +from airflow.providers.apache.beam.hooks.beam import BeamRunnerType +from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator +from airflow.providers.google.cloud.operators.dataflow import DataflowStopJobOperator +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator +from airflow.providers.google.cloud.operators.pubsub import ( + PubSubCreateTopicOperator, + PubSubDeleteTopicOperator, +) +from airflow.utils.trigger_rule import TriggerRule + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default") +DAG_ID = "dataflow_native_python_streaming" + +RESOURCE_DATA_BUCKET = "airflow-system-tests-resources" +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" + +GCS_TMP = f"gs://{BUCKET_NAME}/temp/" +GCS_STAGING = f"gs://{BUCKET_NAME}/staging/" +GCS_PYTHON_SCRIPT = f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/streaming_wordcount.py" +LOCATION = "europe-west3" +TOPIC_ID = f"topic-{DAG_ID}" + +default_args = { + "dataflow_default_options": { + "tempLocation": GCS_TMP, + "stagingLocation": GCS_STAGING, + } +} + +with DAG( + DAG_ID, + default_args=default_args, + schedule="@once", + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example", "dataflow"], +) as dag: + create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) + + create_pub_sub_topic = PubSubCreateTopicOperator( + task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, fail_if_exists=False + ) + + # [START howto_operator_start_streaming_python_job] + start_streaming_python_job = BeamRunPythonPipelineOperator( + runner=BeamRunnerType.DataflowRunner, + task_id="start_streaming_python_job", + py_file=GCS_PYTHON_SCRIPT, + py_options=[], + pipeline_options={ + "temp_location": GCS_TMP, + "input_topic": "projects/pubsub-public-data/topics/taxirides-realtime", + "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}", + "streaming": True, + }, + py_requirements=["apache-beam[gcp]==2.46.0"], + py_interpreter="python3", + py_system_site_packages=False, + dataflow_config={"location": LOCATION}, + ) + # [END howto_operator_start_streaming_python_job] + + stop_dataflow_job = DataflowStopJobOperator( + task_id="stop_dataflow_job", + location=LOCATION, + job_id="{{ task_instance.xcom_pull(task_ids='start_streaming_python_job')['dataflow_job_id'] }}", + ) + + delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", topic=TOPIC_ID, project_id=PROJECT_ID) + delete_topic.trigger_rule = TriggerRule.ALL_DONE + + delete_bucket = GCSDeleteBucketOperator( + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE + ) + + ( + # TEST SETUP + create_bucket + >> create_pub_sub_topic + # TEST BODY + >> start_streaming_python_job + # TEST TEARDOWN + >> stop_dataflow_job + >> delete_topic + >> delete_bucket + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "teardown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)