MaksYermak commented on code in PR #67711:
URL: https://github.com/apache/airflow/pull/67711#discussion_r3371815510


##########
providers/google/src/airflow/providers/google/cloud/hooks/dataflow.py:
##########
@@ -1117,6 +1117,38 @@ def is_job_dataflow_running(
         )
         return jobs_controller.is_job_running()
 
+    @GoogleBaseHook.fallback_to_default_project_id
+    def fetch_job_id_by_name(
+        self,
+        name: str,
+        project_id: str,
+        location: str = DEFAULT_DATAFLOW_LOCATION,
+    ) -> str | None:
+        """
+        Look up a single Dataflow job id by name prefix.
+
+        Returns the id when exactly one active job's name starts with ``name``;
+        ``None`` otherwise.
+        """
+        jobs_controller = _DataflowJobsController(
+            dataflow=self.get_conn(),
+            project_number=project_id,
+            name=name,
+            location=location,
+            poll_sleep=self.poll_sleep,
+            drain_pipeline=self.drain_pipeline,
+            num_retries=self.num_retries,
+            cancel_timeout=self.cancel_timeout,
+        )
+        try:
+            jobs = jobs_controller._get_current_jobs()
+        except Exception:
+            self.log.warning("Failed to look up Dataflow job id by name %r.", 
name, exc_info=True)
+            return None
+        if len(jobs) != 1:
+            return None

Review Comment:
   @evgeniy-b I do not like idea using job name for checking job status, 
because, as I already mentioned, is not unique and all manipulation with a code 
looks like workarounds when we try to introduce additional parameters for 
making job name kind unique, but it still not.
   
   For example when user start two parallel tasks with Jobs which will have the 
same Job name and unique JobIDs for this case what Job this code grab for 
checking the status? As I understand not a single one or, maybe, the first 
JobID from the job list then both task will monitor the same job which is 
wrong. I do not see any solution how we can distinguish two Job with the same 
name between the tasks in parallel run and how task should understand what Job 
to pick. This solution with callbacks was introduced in the beginner of life 
for Apache Beam operators and removing it completely is breaking change for 
users.
   
   **About problem which you mentioned.**
   What version of Apache Beam provider do you use on your Airflow cluster? 
Because problem which you described should not happened because of this 
[code](https://github.com/apache/airflow/blob/main/providers/apache/beam/src/airflow/providers/apache/beam/operators/beam.py#L446-L500).
 This code use callback for getting Job ID from STDOUT for Dataflow runner 
before stating to wait in non-deferrable or deferrable modes. It means that 
changing value for `deferrable` flag from `False` to `True` does not apply to 
callback logic at all, because the code always use callbacks for Dataflow 
runner. And only after getting Job ID decides in what mode wait for result in 
deferrable or non-deferrable.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to