shahar1 commented on code in PR #41576:
URL: https://github.com/apache/airflow/pull/41576#discussion_r1726678936


##########
airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -938,6 +937,110 @@ def launch_job_with_flex_template(
         response: dict = request.execute(num_retries=self.num_retries)
         return response["job"]
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def launch_beam_yaml_job(
+        self,
+        *,
+        job_name: str,
+        yaml_pipeline_file: str,
+        append_job_name: bool,
+        jinja_variables: dict[str, str] | None,
+        options: dict[str, Any] | None,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> str:
+        """
+        Launch a Dataflow YAML job and run it until completion.
+
+        :param job_name: The unique name to assign to the Cloud Dataflow job.
+        :param yaml_pipeline_file: Path to a file defining the YAML pipeline 
to run.
+            Must be a local file or a URL beginning with 'gs://'.
+        :param append_job_name: Set to True if a unique suffix has to be 
appended to the `job_name`.
+        :param jinja_variables: A dictionary of Jinja2 variables to be used in 
reifying the yaml pipeline file.
+        :param options: Additional gcloud or Beam job parameters.
+            It must be a dictionary with the keys matching the optional flag 
names in gcloud.
+            The list of supported flags can be found at: 
`https://cloud.google.com/sdk/gcloud/reference/dataflow/yaml/run`.
+            Note that if a flag does not require a value, then its dictionary 
value must be either True or None.
+            For example, the `--log-http` flag can be passed as {'log-http': 
True}.
+        :param project_id: The ID of the GCP project that owns the job.
+        :param location: Region ID of the job's regional endpoint. Defaults to 
'us-central1'.
+        :param on_new_job_callback: Callback function that passes the job to 
the operator once known.
+        :return: Job ID.
+        """
+        job_name = self.build_dataflow_job_name(job_name, append_job_name)
+        cmd = self._build_yaml_gcloud_command(
+            job_name=job_name,
+            yaml_pipeline_file=yaml_pipeline_file,
+            project_id=project_id,
+            region=location,
+            options=options,
+            jinja_variables=jinja_variables,
+        )
+        job_id = self._create_dataflow_job_with_gcloud(cmd=cmd)
+        return job_id
+
+    def _build_yaml_gcloud_command(
+        self,
+        job_name: str,
+        yaml_pipeline_file: str,
+        project_id: str,
+        region: str,
+        options: dict[str, Any] | None,
+        jinja_variables: dict[str, str] | None,
+    ) -> list[str]:
+        gcp_flags = {
+            "yaml-pipeline-file": yaml_pipeline_file,
+            "project": project_id,
+            "format": "value(job.id)",
+            "region": region,
+        }
+
+        if jinja_variables:
+            gcp_flags["jinja-variables"] = json.dumps(jinja_variables)
+
+        if options:
+            gcp_flags.update(options)
+
+        if self.impersonation_chain:
+            if isinstance(self.impersonation_chain, str):
+                impersonation_account = self.impersonation_chain
+            elif len(self.impersonation_chain) == 1:
+                impersonation_account = self.impersonation_chain[0]
+            else:
+                raise AirflowException(
+                    "Chained list of accounts is not supported, please specify 
only one service account."
+                )
+            gcp_flags.update({"impersonate-service-account": 
impersonation_account})
+
+        return [
+            "gcloud",
+            "dataflow",
+            "yaml",
+            "run",
+            job_name,
+            *(beam_options_to_args(gcp_flags)),
+        ]

Review Comment:
   This logic is taken from `DataflowHook.start_sql_job`. Maybe you could 
extract it to a utility function that will be used by both? (with the required 
adjustments)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to