This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d793fda391 Add ability to run streaming Job for 
BeamRunPythonPipelineOperator in non deferrable mode (#36108)
d793fda391 is described below

commit d793fda39161be9281d3d4da54e2e2b4f6344b4d
Author: Maksim <maks...@google.com>
AuthorDate: Mon Dec 18 01:15:23 2023 +0100

    Add ability to run streaming Job for BeamRunPythonPipelineOperator in non 
deferrable mode (#36108)
---
 airflow/providers/apache/beam/hooks/beam.py        |  14 ++-
 airflow/providers/apache/beam/operators/beam.py    |  48 +++++++-
 airflow/providers/google/cloud/hooks/dataflow.py   |  18 +++
 .../providers/google/cloud/operators/dataflow.py   |   6 +
 .../operators/cloud/dataflow.rst                   |  18 ++-
 tests/providers/apache/beam/hooks/test_beam.py     |  21 +++-
 tests/providers/apache/beam/operators/test_beam.py |   1 +
 .../dataflow/example_dataflow_streaming_python.py  | 126 +++++++++++++++++++++
 8 files changed, 237 insertions(+), 15 deletions(-)

diff --git a/airflow/providers/apache/beam/hooks/beam.py 
b/airflow/providers/apache/beam/hooks/beam.py
index c27cc41710..efea53560b 100644
--- a/airflow/providers/apache/beam/hooks/beam.py
+++ b/airflow/providers/apache/beam/hooks/beam.py
@@ -90,6 +90,7 @@ def process_fd(
     fd,
     log: logging.Logger,
     process_line_callback: Callable[[str], None] | None = None,
+    check_job_status_callback: Callable[[], bool | None] | None = None,
 ):
     """
     Print output to logs.
@@ -111,6 +112,8 @@ def process_fd(
         if process_line_callback:
             process_line_callback(line)
         func_log(line.rstrip("\n"))
+        if check_job_status_callback and check_job_status_callback():
+            return
 
 
 def run_beam_command(
@@ -118,6 +121,7 @@ def run_beam_command(
     log: logging.Logger,
     process_line_callback: Callable[[str], None] | None = None,
     working_directory: str | None = None,
+    check_job_status_callback: Callable[[], bool | None] | None = None,
 ) -> None:
     """
     Run pipeline command in subprocess.
@@ -149,14 +153,16 @@ def run_beam_command(
             continue
 
         for readable_fd in readable_fds:
-            process_fd(proc, readable_fd, log, process_line_callback)
+            process_fd(proc, readable_fd, log, process_line_callback, 
check_job_status_callback)
+            if check_job_status_callback and check_job_status_callback():
+                return
 
         if proc.poll() is not None:
             break
 
     # Corner case: check if more output was created between the last read and 
the process termination
     for readable_fd in reads:
-        process_fd(proc, readable_fd, log, process_line_callback)
+        process_fd(proc, readable_fd, log, process_line_callback, 
check_job_status_callback)
 
     log.info("Process exited with return code: %s", proc.returncode)
 
@@ -187,6 +193,7 @@ class BeamHook(BaseHook):
         command_prefix: list[str],
         process_line_callback: Callable[[str], None] | None = None,
         working_directory: str | None = None,
+        check_job_status_callback: Callable[[], bool | None] | None = None,
     ) -> None:
         cmd = [*command_prefix, f"--runner={self.runner}"]
         if variables:
@@ -196,6 +203,7 @@ class BeamHook(BaseHook):
             process_line_callback=process_line_callback,
             working_directory=working_directory,
             log=self.log,
+            check_job_status_callback=check_job_status_callback,
         )
 
     def start_python_pipeline(
@@ -207,6 +215,7 @@ class BeamHook(BaseHook):
         py_requirements: list[str] | None = None,
         py_system_site_packages: bool = False,
         process_line_callback: Callable[[str], None] | None = None,
+        check_job_status_callback: Callable[[], bool | None] | None = None,
     ):
         """
         Start Apache Beam python pipeline.
@@ -279,6 +288,7 @@ class BeamHook(BaseHook):
                 variables=variables,
                 command_prefix=command_prefix,
                 process_line_callback=process_line_callback,
+                check_job_status_callback=check_job_status_callback,
             )
 
     def start_java_pipeline(
diff --git a/airflow/providers/apache/beam/operators/beam.py 
b/airflow/providers/apache/beam/operators/beam.py
index 943fc5b697..876f47fa2d 100644
--- a/airflow/providers/apache/beam/operators/beam.py
+++ b/airflow/providers/apache/beam/operators/beam.py
@@ -67,7 +67,7 @@ class BeamDataflowMixin(metaclass=ABCMeta):
         self,
         pipeline_options: dict,
         job_name_variable_key: str | None = None,
-    ) -> tuple[str, dict, Callable[[str], None]]:
+    ) -> tuple[str, dict, Callable[[str], None], Callable[[], bool | None]]:
         self.dataflow_hook = self.__set_dataflow_hook()
         self.dataflow_config.project_id = self.dataflow_config.project_id or 
self.dataflow_hook.project_id
         dataflow_job_name = self.__get_dataflow_job_name()
@@ -75,7 +75,8 @@ class BeamDataflowMixin(metaclass=ABCMeta):
             pipeline_options, dataflow_job_name, job_name_variable_key
         )
         process_line_callback = self.__get_dataflow_process_callback()
-        return dataflow_job_name, pipeline_options, process_line_callback
+        check_job_status_callback = self.__check_dataflow_job_status_callback()
+        return dataflow_job_name, pipeline_options, process_line_callback, 
check_job_status_callback
 
     def __set_dataflow_hook(self) -> DataflowHook:
         self.dataflow_hook = DataflowHook(
@@ -123,6 +124,19 @@ class BeamDataflowMixin(metaclass=ABCMeta):
             on_new_job_id_callback=set_current_dataflow_job_id
         )
 
+    def __check_dataflow_job_status_callback(self) -> Callable[[], bool | 
None]:
+        def check_dataflow_job_status() -> bool | None:
+            if self.dataflow_job_id and self.dataflow_hook:
+                return self.dataflow_hook.is_job_done(
+                    location=self.dataflow_config.location,
+                    project_id=self.dataflow_config.project_id,
+                    job_id=self.dataflow_job_id,
+                )
+            else:
+                return None
+
+        return check_dataflow_job_status
+
 
 class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC):
     """
@@ -184,14 +198,20 @@ class BeamBasePipelineOperator(BaseOperator, 
BeamDataflowMixin, ABC):
         self,
         format_pipeline_options: bool = False,
         job_name_variable_key: str | None = None,
-    ) -> tuple[bool, str | None, dict, Callable[[str], None] | None]:
+    ) -> tuple[bool, str | None, dict, Callable[[str], None] | None, 
Callable[[], bool | None] | None]:
         self.beam_hook = BeamHook(runner=self.runner)
         pipeline_options = self.default_pipeline_options.copy()
         process_line_callback: Callable[[str], None] | None = None
+        check_job_status_callback: Callable[[], bool | None] | None = None
         is_dataflow = self.runner.lower() == 
BeamRunnerType.DataflowRunner.lower()
         dataflow_job_name: str | None = None
         if is_dataflow:
-            dataflow_job_name, pipeline_options, process_line_callback = 
self._set_dataflow(
+            (
+                dataflow_job_name,
+                pipeline_options,
+                process_line_callback,
+                check_job_status_callback,
+            ) = self._set_dataflow(
                 pipeline_options=pipeline_options,
                 job_name_variable_key=job_name_variable_key,
             )
@@ -203,9 +223,21 @@ class BeamBasePipelineOperator(BaseOperator, 
BeamDataflowMixin, ABC):
             snake_case_pipeline_options = {
                 convert_camel_to_snake(key): pipeline_options[key] for key in 
pipeline_options
             }
-            return is_dataflow, dataflow_job_name, 
snake_case_pipeline_options, process_line_callback
+            return (
+                is_dataflow,
+                dataflow_job_name,
+                snake_case_pipeline_options,
+                process_line_callback,
+                check_job_status_callback,
+            )
 
-        return is_dataflow, dataflow_job_name, pipeline_options, 
process_line_callback
+        return (
+            is_dataflow,
+            dataflow_job_name,
+            pipeline_options,
+            process_line_callback,
+            check_job_status_callback,
+        )
 
 
 class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
@@ -297,6 +329,7 @@ class 
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
             self.dataflow_job_name,
             self.snake_case_pipeline_options,
             self.process_line_callback,
+            self.check_job_status_callback,
         ) = self._init_pipeline_options(format_pipeline_options=True, 
job_name_variable_key="job_name")
         if not self.beam_hook:
             raise AirflowException("Beam hook is not defined.")
@@ -329,6 +362,7 @@ class 
BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
                         py_requirements=self.py_requirements,
                         py_system_site_packages=self.py_system_site_packages,
                         process_line_callback=self.process_line_callback,
+                        
check_job_status_callback=self.check_job_status_callback,
                     )
                 DataflowJobLink.persist(
                     self,
@@ -495,6 +529,7 @@ class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
             dataflow_job_name,
             pipeline_options,
             process_line_callback,
+            _,
         ) = self._init_pipeline_options()
 
         if not self.beam_hook:
@@ -668,6 +703,7 @@ class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
             dataflow_job_name,
             snake_case_pipeline_options,
             process_line_callback,
+            _,
         ) = self._init_pipeline_options(format_pipeline_options=True, 
job_name_variable_key="job_name")
 
         if not self.beam_hook:
diff --git a/airflow/providers/google/cloud/hooks/dataflow.py 
b/airflow/providers/google/cloud/hooks/dataflow.py
index 1769839a0b..6c051ce619 100644
--- a/airflow/providers/google/cloud/hooks/dataflow.py
+++ b/airflow/providers/google/cloud/hooks/dataflow.py
@@ -1203,6 +1203,24 @@ class DataflowHook(GoogleBaseHook):
         )
         job_controller.wait_for_done()
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def is_job_done(self, location: str, project_id: str, job_id: str) -> bool:
+        """
+        Check that Dataflow job is started(for streaming job) or finished(for 
batch job).
+
+        :param location: location the job is running
+        :param project_id: Google Cloud project ID in which to start a job
+        :param job_id: Dataflow job ID
+        """
+        job_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            location=location,
+        )
+        job = job_controller.fetch_job_by_id(job_id)
+
+        return job_controller._check_dataflow_job_state(job)
+
 
 class AsyncDataflowHook(GoogleBaseAsyncHook):
     """Async hook class for dataflow service."""
diff --git a/airflow/providers/google/cloud/operators/dataflow.py 
b/airflow/providers/google/cloud/operators/dataflow.py
index 59b2715886..ec813c55eb 100644
--- a/airflow/providers/google/cloud/operators/dataflow.py
+++ b/airflow/providers/google/cloud/operators/dataflow.py
@@ -1286,6 +1286,12 @@ class DataflowStopJobOperator(GoogleCloudBaseOperator):
     :param stop_timeout: wait time in seconds for successful job 
canceling/draining
     """
 
+    template_fields = [
+        "job_id",
+        "project_id",
+        "impersonation_chain",
+    ]
+
     def __init__(
         self,
         job_name_prefix: str | None = None,
diff --git a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst 
b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
index 366f14f872..3851cb8fa8 100644
--- a/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
+++ b/docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
@@ -65,9 +65,9 @@ Starting Non-templated pipeline
 
 To create a new pipeline using the source file (JAR in Java or Python file) use
 the create job operators. The source file can be located on GCS or on the 
local filesystem.
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`
 or
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`
 
 .. _howto/operator:DataflowCreateJavaJobOperator:
 
@@ -75,7 +75,7 @@ Java SDK pipelines
 """"""""""""""""""
 
 For Java pipeline the ``jar`` argument must be specified for
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`
 as it contains the pipeline to be executed on Dataflow. The JAR can be 
available on GCS that Airflow
 has the ability to download or available on the local filesystem (provide the 
absolute path to it).
 
@@ -101,7 +101,7 @@ Python SDK pipelines
 """"""""""""""""""""
 
 The ``py_file`` argument must be specified for
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`
 as it contains the pipeline to be executed on Dataflow. The Python file can be 
available on GCS that Airflow
 has the ability to download or available on the local filesystem (provide the 
absolute path to it).
 
@@ -129,8 +129,8 @@ Dataflow has multiple options of executing pipelines. It 
can be done in the foll
 batch asynchronously (fire and forget), batch blocking (wait until 
completion), or streaming (run indefinitely).
 In Airflow it is best practice to use asynchronous batch pipelines or streams 
and use sensors to listen for expected job state.
 
-By default 
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`,
-:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`,
+By default 
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`,
+:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`,
 
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator`
 and
 
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator`
 have argument ``wait_until_finished`` set to ``None`` which cause different 
behaviour depends on the type of pipeline:
@@ -175,6 +175,12 @@ Streaming execution
 To execute a streaming Dataflow job, ensure the streaming option is set (for 
Python) or read from an unbounded data
 source, such as Pub/Sub, in your pipeline (for Java).
 
+.. exampleinclude:: 
/../../tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_start_streaming_python_job]
+    :end-before: [END howto_operator_start_streaming_python_job]
+
 Setting argument ``drain_pipeline`` to ``True`` allows to stop streaming job 
by draining it
 instead of canceling during killing task instance.
 
diff --git a/tests/providers/apache/beam/hooks/test_beam.py 
b/tests/providers/apache/beam/hooks/test_beam.py
index 1cfe71121c..ce33b683e2 100644
--- a/tests/providers/apache/beam/hooks/test_beam.py
+++ b/tests/providers/apache/beam/hooks/test_beam.py
@@ -67,12 +67,14 @@ class TestBeamHook:
     def test_start_python_pipeline(self, mock_check_output, mock_runner):
         hook = BeamHook(runner=DEFAULT_RUNNER)
         process_line_callback = MagicMock()
+        check_job_status_callback = MagicMock()
 
         hook.start_python_pipeline(
             variables=copy.deepcopy(BEAM_VARIABLES_PY),
             py_file=PY_FILE,
             py_options=PY_OPTIONS,
             process_line_callback=process_line_callback,
+            check_job_status_callback=check_job_status_callback,
         )
 
         expected_cmd = [
@@ -88,6 +90,7 @@ class TestBeamHook:
             process_line_callback=process_line_callback,
             working_directory=None,
             log=ANY,
+            check_job_status_callback=check_job_status_callback,
         )
 
     
@mock.patch("airflow.providers.apache.beam.hooks.beam.subprocess.check_output", 
return_value=b"2.35.0")
@@ -108,6 +111,7 @@ class TestBeamHook:
                 py_requirements=None,
                 py_system_site_packages=False,
                 process_line_callback=MagicMock(),
+                check_job_status_callback=MagicMock(),
             )
 
     @pytest.mark.parametrize(
@@ -126,6 +130,7 @@ class TestBeamHook:
     ):
         hook = BeamHook(runner=DEFAULT_RUNNER)
         process_line_callback = MagicMock()
+        check_job_status_callback = MagicMock()
 
         hook.start_python_pipeline(
             variables=copy.deepcopy(BEAM_VARIABLES_PY),
@@ -133,6 +138,7 @@ class TestBeamHook:
             py_options=PY_OPTIONS,
             py_interpreter=py_interpreter,
             process_line_callback=process_line_callback,
+            check_job_status_callback=check_job_status_callback,
         )
 
         expected_cmd = [
@@ -148,6 +154,7 @@ class TestBeamHook:
             process_line_callback=process_line_callback,
             working_directory=None,
             log=ANY,
+            check_job_status_callback=check_job_status_callback,
         )
 
     @pytest.mark.parametrize(
@@ -172,6 +179,7 @@ class TestBeamHook:
         hook = BeamHook(runner=DEFAULT_RUNNER)
         mock_virtualenv.return_value = "/dummy_dir/bin/python"
         process_line_callback = MagicMock()
+        check_job_status_callback = MagicMock()
 
         hook.start_python_pipeline(
             variables=copy.deepcopy(BEAM_VARIABLES_PY),
@@ -180,6 +188,7 @@ class TestBeamHook:
             py_requirements=current_py_requirements,
             py_system_site_packages=current_py_system_site_packages,
             process_line_callback=process_line_callback,
+            check_job_status_callback=check_job_status_callback,
         )
 
         expected_cmd = [
@@ -193,6 +202,7 @@ class TestBeamHook:
         mock_runner.assert_called_once_with(
             cmd=expected_cmd,
             process_line_callback=process_line_callback,
+            check_job_status_callback=check_job_status_callback,
             working_directory=None,
             log=ANY,
         )
@@ -211,6 +221,7 @@ class TestBeamHook:
         hook = BeamHook(runner=DEFAULT_RUNNER)
         wait_for_done = mock_runner.return_value.wait_for_done
         process_line_callback = MagicMock()
+        check_job_status_callback = MagicMock()
 
         with pytest.raises(AirflowException, match=r"Invalid method 
invocation\."):
             hook.start_python_pipeline(
@@ -219,6 +230,7 @@ class TestBeamHook:
                 py_options=PY_OPTIONS,
                 py_requirements=[],
                 process_line_callback=process_line_callback,
+                check_job_status_callback=check_job_status_callback,
             )
 
         mock_runner.assert_not_called()
@@ -244,7 +256,11 @@ class TestBeamHook:
             '--labels={"foo":"bar"}',
         ]
         mock_runner.assert_called_once_with(
-            cmd=expected_cmd, process_line_callback=process_line_callback, 
working_directory=None, log=ANY
+            cmd=expected_cmd,
+            process_line_callback=process_line_callback,
+            working_directory=None,
+            log=ANY,
+            check_job_status_callback=None,
         )
 
     @mock.patch(BEAM_STRING.format("run_beam_command"))
@@ -273,6 +289,7 @@ class TestBeamHook:
             process_line_callback=process_line_callback,
             working_directory=None,
             log=ANY,
+            check_job_status_callback=None,
         )
 
     @mock.patch(BEAM_STRING.format("shutil.which"))
@@ -303,6 +320,7 @@ class TestBeamHook:
             process_line_callback=process_line_callback,
             working_directory=go_workspace,
             log=ANY,
+            check_job_status_callback=None,
         )
 
     @mock.patch(BEAM_STRING.format("shutil.which"))
@@ -348,6 +366,7 @@ class TestBeamHook:
             process_line_callback=process_line_callback,
             working_directory=None,
             log=ANY,
+            check_job_status_callback=None,
         )
 
 
diff --git a/tests/providers/apache/beam/operators/test_beam.py 
b/tests/providers/apache/beam/operators/test_beam.py
index d67b3ff647..8b6f57cccc 100644
--- a/tests/providers/apache/beam/operators/test_beam.py
+++ b/tests/providers/apache/beam/operators/test_beam.py
@@ -160,6 +160,7 @@ class TestBeamRunPythonPipelineOperator:
             py_requirements=None,
             py_system_site_packages=False,
             process_line_callback=mock.ANY,
+            check_job_status_callback=mock.ANY,
         )
         
dataflow_hook_mock.return_value.provide_authorized_gcloud.assert_called_once_with()
 
diff --git 
a/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
new file mode 100644
index 0000000000..0489117e5d
--- /dev/null
+++ 
b/tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG for testing Google Dataflow Beam Pipeline Operator with 
Python for Streaming job.
+"""
+from __future__ import annotations
+
+import os
+from datetime import datetime
+
+from airflow.models.dag import DAG
+from airflow.providers.apache.beam.hooks.beam import BeamRunnerType
+from airflow.providers.apache.beam.operators.beam import 
BeamRunPythonPipelineOperator
+from airflow.providers.google.cloud.operators.dataflow import 
DataflowStopJobOperator
+from airflow.providers.google.cloud.operators.gcs import 
GCSCreateBucketOperator, GCSDeleteBucketOperator
+from airflow.providers.google.cloud.operators.pubsub import (
+    PubSubCreateTopicOperator,
+    PubSubDeleteTopicOperator,
+)
+from airflow.utils.trigger_rule import TriggerRule
+
+ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
+PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
+DAG_ID = "dataflow_native_python_streaming"
+
+RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
+BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
+
+GCS_TMP = f"gs://{BUCKET_NAME}/temp/"
+GCS_STAGING = f"gs://{BUCKET_NAME}/staging/"
+GCS_PYTHON_SCRIPT = 
f"gs://{RESOURCE_DATA_BUCKET}/dataflow/python/streaming_wordcount.py"
+LOCATION = "europe-west3"
+TOPIC_ID = f"topic-{DAG_ID}"
+
+default_args = {
+    "dataflow_default_options": {
+        "tempLocation": GCS_TMP,
+        "stagingLocation": GCS_STAGING,
+    }
+}
+
+with DAG(
+    DAG_ID,
+    default_args=default_args,
+    schedule="@once",
+    start_date=datetime(2021, 1, 1),
+    catchup=False,
+    tags=["example", "dataflow"],
+) as dag:
+    create_bucket = GCSCreateBucketOperator(task_id="create_bucket", 
bucket_name=BUCKET_NAME)
+
+    create_pub_sub_topic = PubSubCreateTopicOperator(
+        task_id="create_topic", topic=TOPIC_ID, project_id=PROJECT_ID, 
fail_if_exists=False
+    )
+
+    # [START howto_operator_start_streaming_python_job]
+    start_streaming_python_job = BeamRunPythonPipelineOperator(
+        runner=BeamRunnerType.DataflowRunner,
+        task_id="start_streaming_python_job",
+        py_file=GCS_PYTHON_SCRIPT,
+        py_options=[],
+        pipeline_options={
+            "temp_location": GCS_TMP,
+            "input_topic": 
"projects/pubsub-public-data/topics/taxirides-realtime",
+            "output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
+            "streaming": True,
+        },
+        py_requirements=["apache-beam[gcp]==2.46.0"],
+        py_interpreter="python3",
+        py_system_site_packages=False,
+        dataflow_config={"location": LOCATION},
+    )
+    # [END howto_operator_start_streaming_python_job]
+
+    stop_dataflow_job = DataflowStopJobOperator(
+        task_id="stop_dataflow_job",
+        location=LOCATION,
+        job_id="{{ 
task_instance.xcom_pull(task_ids='start_streaming_python_job')['dataflow_job_id']
 }}",
+    )
+
+    delete_topic = PubSubDeleteTopicOperator(task_id="delete_topic", 
topic=TOPIC_ID, project_id=PROJECT_ID)
+    delete_topic.trigger_rule = TriggerRule.ALL_DONE
+
+    delete_bucket = GCSDeleteBucketOperator(
+        task_id="delete_bucket", bucket_name=BUCKET_NAME, 
trigger_rule=TriggerRule.ALL_DONE
+    )
+
+    (
+        # TEST SETUP
+        create_bucket
+        >> create_pub_sub_topic
+        # TEST BODY
+        >> start_streaming_python_job
+        # TEST TEARDOWN
+        >> stop_dataflow_job
+        >> delete_topic
+        >> delete_bucket
+    )
+
+    from tests.system.utils.watcher import watcher
+
+    # This test needs watcher in order to properly mark success/failure
+    # when "teardown" task with trigger rule is part of the DAG
+    list(dag.tasks) >> watcher()
+
+
+from tests.system.utils import get_test_run  # noqa: E402
+
+# Needed to run the example DAG with pytest (see: 
tests/system/README.md#run_via_pytest)
+test_run = get_test_run(dag)

Reply via email to