MaksYermak commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3371815510
##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
)
return jobs_controller.is_job_running()
+ @GoogleBaseHook.fallback_to_default_project_id
+ def fetch_job_id_by_name(
+ self,
+ name: str,
+ project_id: str,
+ location: str = DEFAULT_DATAFLOW_LOCATION,
+ ) -> str | None:
+ """
+ Look up a single Dataflow job id by name prefix.
+
+ Returns the id when exactly one active job's name starts with ``name``;
+ ``None`` otherwise.
+ """
+ jobs_controller = _DataflowJobsController(
+ dataflow=self.get_conn(),
+ project_number=project_id,
+ name=name,
+ location=location,
+ poll_sleep=self.poll_sleep,
+ drain_pipeline=self.drain_pipeline,
+ num_retries=self.num_retries,
+ cancel_timeout=self.cancel_timeout,
+ )
+ try:
+ jobs = jobs_controller._get_current_jobs()
+ except Exception:
+ self.log.warning("Failed to look up Dataflow job id by name %r.",
name, exc_info=True)
+ return None
+ if len(jobs) != 1:
+ return None
Review Comment:
@evgeniy-b I do not like idea using job name for checking job status,
because, as I already mentioned, is not unique and all manipulation with a code
looks like workarounds when we try to introduce additional parameters for
making job name kind unique, but it still not.
For example when user start two parallel tasks with Jobs which will have the
same Job name and unique JobIDs for this case what Job this code grab for
checking the status? As I understand not a single one or, maybe, the first
JobID from the job list then both task will monitor the same job which is
wrong. I do not see any solution how we can distinguish two Job with the same
name between the tasks in parallel run and how task should understand what Job
to pick. This solution with callbacks was introduced in the beginner of life
for Apache Beam operators and removing it completely is breaking change for
users.
**About problem which you mentioned.**
What version of Apache Beam provider do you use on your Airflow cluster?
Because problem which you described should not happened because of this
[code](https://github.com/apache/airflow/blob/main/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py#L446-L500).
This code use callback for getting Job ID from STDOUT for Dataflow runner
before stating to wait in non-deferrable or deferrable modes. It means that
changing value for `deferrable` flag from `False` to `True` does not apply to
callback logic at all, because the code always use callbacks for Dataflow
runner. And only after getting Job ID decides in what mode wait for result in
deferrable or non-deferrable.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]