Lee-W commented on code in PR #36904:
URL: https://github.com/apache/airflow/pull/36904#discussion_r1460266199


##########
airflow/providers/http/sensors/http.py:
##########
@@ -148,3 +158,25 @@ def poke(self, context: Context) -> bool:
             raise exc
 
         return True
+
+    def execute(self, context: Context) -> None:
+        if not self.deferrable or self.response_check:
+            super().execute(context=context)
+        elif not self.poke(context):
+            self.defer(
+                timeout=timedelta(seconds=self.timeout),
+                trigger=HttpSensorTrigger(
+                    endpoint=self.endpoint,
+                    http_conn_id=self.http_conn_id,
+                    data=self.request_params,
+                    headers=self.headers,
+                    method=self.method,
+                    extra_options=self.extra_options,
+                    poke_interval=self.poke_interval,
+                ),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context: Context, event: bool | None = None) -> 
None:

Review Comment:
   may i know why `event` is annotated as `bool`here?



##########
airflow/providers/http/sensors/http.py:
##########
@@ -148,3 +158,25 @@ def poke(self, context: Context) -> bool:
             raise exc
 
         return True
+
+    def execute(self, context: Context) -> None:
+        if not self.deferrable or self.response_check:
+            super().execute(context=context)
+        elif not self.poke(context):
+            self.defer(
+                timeout=timedelta(seconds=self.timeout),
+                trigger=HttpSensorTrigger(
+                    endpoint=self.endpoint,
+                    http_conn_id=self.http_conn_id,
+                    data=self.request_params,
+                    headers=self.headers,
+                    method=self.method,
+                    extra_options=self.extra_options,
+                    poke_interval=self.poke_interval,
+                ),
+                method_name="execute_complete",
+            )
+
+    def execute_complete(self, context: Context, event: bool | None = None) -> 
None:
+        self.log.info("%s completed successfully.", self.task_id)
+        return None

Review Comment:
   ```suggestion
   ```
   
   nitpick: it might not be necessary



##########
tests/providers/http/sensors/test_http.py:
##########
@@ -330,3 +331,63 @@ def test_sensor(self):
             dag=self.dag,
         )
         sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+
+class TestHttpSensorAsync:
+    @mock.patch("airflow.providers.http.sensors.http.HttpSensor.defer")
+    @mock.patch(
+        "airflow.providers.http.sensors.http.HttpSensor.poke",
+        return_value=True,
+    )
+    def test_http_sensor_async_finish_before_deferred(
+        self,
+        mock_poke,
+        mock_defer,
+    ):
+        """
+        Asserts that a task is deferred and a HttpTrigger will be fired
+        when the HttpSensor is executed in deferrable mode.
+        """
+
+        task = HttpSensor(task_id="run_now", endpoint="test-endpoint", 
deferrable=True)
+
+        task.execute({})
+        assert not mock_defer.called
+
+    @mock.patch(
+        "airflow.providers.http.sensors.http.HttpSensor.poke",
+        return_value=False,
+    )
+    def test_http_run_now_sensor_async(self, mock_poke):
+        """
+        Asserts that a task is deferred and a HttpTrigger will be fired
+        when the HttpSensor is executed in deferrable mode.
+        """
+
+        task = HttpSensor(task_id="run_now", endpoint="test-endpoint", 
deferrable=True)
+
+        with pytest.raises(TaskDeferred) as exc:
+            task.execute({})
+
+        assert isinstance(exc.value.trigger, HttpSensorTrigger), "Trigger is 
not a HttpTrigger"
+
+    @mock.patch("airflow.providers.http.sensors.http.HttpSensor.defer")
+    @mock.patch("airflow.sensors.base.BaseSensorOperator.execute")
+    def test_sensor_not_defer(self, mock_execute, mock_defer):
+        task = HttpSensor(
+            task_id="run_now",
+            endpoint="test-endpoint",
+            response_check=lambda response: "httpbin" in response.text,
+            deferrable=True,
+        )
+        task.execute({})
+        mock_execute.assert_called_once()
+        mock_defer.assert_not_called()
+
+    @mock.patch("airflow.providers.http.sensors.http.HttpSensor.poke")
+    def test_sensor_defer(self, mock_poke):
+        mock_poke.return_value = False

Review Comment:
   ```suggestion
   ```



##########
tests/providers/http/sensors/test_http.py:
##########
@@ -330,3 +331,63 @@ def test_sensor(self):
             dag=self.dag,
         )
         sensor.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, 
ignore_ti_state=True)
+
+
+class TestHttpSensorAsync:
+    @mock.patch("airflow.providers.http.sensors.http.HttpSensor.defer")
+    @mock.patch(
+        "airflow.providers.http.sensors.http.HttpSensor.poke",
+        return_value=True,
+    )
+    def test_http_sensor_async_finish_before_deferred(
+        self,
+        mock_poke,
+        mock_defer,
+    ):
+        """
+        Asserts that a task is deferred and a HttpTrigger will be fired
+        when the HttpSensor is executed in deferrable mode.
+        """
+
+        task = HttpSensor(task_id="run_now", endpoint="test-endpoint", 
deferrable=True)
+
+        task.execute({})
+        assert not mock_defer.called
+
+    @mock.patch(
+        "airflow.providers.http.sensors.http.HttpSensor.poke",
+        return_value=False,
+    )
+    def test_http_run_now_sensor_async(self, mock_poke):
+        """
+        Asserts that a task is deferred and a HttpTrigger will be fired
+        when the HttpSensor is executed in deferrable mode.
+        """
+
+        task = HttpSensor(task_id="run_now", endpoint="test-endpoint", 
deferrable=True)
+
+        with pytest.raises(TaskDeferred) as exc:
+            task.execute({})
+
+        assert isinstance(exc.value.trigger, HttpSensorTrigger), "Trigger is 
not a HttpTrigger"
+
+    @mock.patch("airflow.providers.http.sensors.http.HttpSensor.defer")
+    @mock.patch("airflow.sensors.base.BaseSensorOperator.execute")
+    def test_sensor_not_defer(self, mock_execute, mock_defer):
+        task = HttpSensor(
+            task_id="run_now",
+            endpoint="test-endpoint",
+            response_check=lambda response: "httpbin" in response.text,
+            deferrable=True,
+        )
+        task.execute({})
+        mock_execute.assert_called_once()
+        mock_defer.assert_not_called()
+
+    @mock.patch("airflow.providers.http.sensors.http.HttpSensor.poke")

Review Comment:
   ```suggestion
       @mock.patch("airflow.providers.http.sensors.http.HttpSensor.poke", 
return_value=False)
   ```
   
   I think we should unify one way to write it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to