e-galan commented on code in PR #37693:
URL: https://github.com/apache/airflow/pull/37693#discussion_r1542570551


##########
airflow/providers/google/cloud/sensors/dataflow.py:
##########
@@ -275,26 +286,64 @@ def poke(self, context: Context) -> bool:
             location=self.location,
         )
 
-        return self.callback(result)
+        return result if self.callback is None else self.callback(result)
+
+    def execute(self, context: Context) -> Any:
+        """Airflow runs this method on the worker and defers using the 
trigger."""
+        if not self.deferrable:
+            super().execute(context)
+        else:
+            self.defer(
+                timeout=self.execution_timeout,
+                trigger=DataflowJobMessagesTrigger(
+                    job_id=self.job_id,
+                    project_id=self.project_id,
+                    location=self.location,
+                    gcp_conn_id=self.gcp_conn_id,
+                    impersonation_chain=self.impersonation_chain,
+                    fail_on_terminal_state=self.fail_on_terminal_state,
+                ),
+                method_name="execute_complete",
+            )

Review Comment:
   @Lee-W 
   I see the benefits of checking the job state once before deferring, however 
the `poke()` method of the class returns 
   
   `return self.callback(result["metrics"])`
   
   where `callback` is an arbitrary function that can be passed by users when 
instantiating the sensor. So we do not know what kind of return value we will 
get from it, will it be boolean, truthy / falsy, or something else.
   
   This callback parameter is present in 3 out of 4 Dataflow sensors. It is 
missing for `DataflowJobStatusSensor` though, so I will definitely add it 
there. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to