dabla commented on issue #40209:
URL: https://github.com/apache/airflow/issues/40209#issuecomment-3160909935

   > Hey [@dabla](https://github.com/dabla) – I looked into your PR that was 
merged related to this issue. Curious to understand: what was the motivation 
for moving the response_check logic to the connection level?
   > 
   > In my case, I’m generating DAGs dynamically via Jinja templates based on 
user input. Each connection acts as a root, and I dynamically attach endpoints 
and response_check functions per task. So, binding response_check at the 
connection level isn’t ideal for this setup, and I'm still running into the 
same issue described here.
   > 
   > For anyone else facing this, here’s the workaround I’m using for now:
   > 
   > `class DeferredHttpPollingSensor(HttpSensor): """ Custom HttpSensor that 
defers the polling if the response is not ready. """
   > 
   > ```
   > def __init__(self, *args, **kwargs):
   >     """Initialize the DeferredHttpPollingSensor."""
   >     kwargs["deferrable"] = True
   >     super().__init__(*args, **kwargs)
   > 
   > def execute(self, context: Context):
   >     """Execute the sensor."""
   >     # Store the start time for timeout tracking
   >     context["ti"].xcom_push(
   >         key="deferred_http_polling_sensor_start", 
value=datetime.now().isoformat()
   >     )
   > 
   >     if self.poke(context):
   >         return True
   > 
   >     self.defer(
   >         
trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)),
   >         method_name="execute_complete",
   >     )
   > 
   > def execute_complete(self, context: Context, event=None):
   >     """Execute the complete sensor."""
   >     # Check timeout
   >     start_time_str = 
context["ti"].xcom_pull(key="deferred_http_polling_sensor_start")
   >     if start_time_str:
   >         start_time = datetime.fromisoformat(start_time_str)
   >         elapsed_time = (datetime.now() - start_time).total_seconds()
   > 
   >         if elapsed_time >= self.timeout:
   >             raise AirflowSensorTimeout(
   >                 f"Snap. Time is OUT. DAG: {context['dag'].dag_id}, "
   >                 f"Task: {context['task'].task_id}, "
   >                 f"Duration: {elapsed_time}s, "
   >                 f"Timeout: {self.timeout}s"
   >             )
   > 
   >     if self.poke(context):
   >         return True
   > 
   >     self.defer(
   >         
trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)),
   >         method_name="execute_complete",
   >     )`
   > ```
   
   
   
   > Hey [@dabla](https://github.com/dabla) – I looked into your PR that was 
merged related to this issue. Curious to understand: what was the motivation 
for moving the response_check logic to the connection level?
   > 
   > In my case, I’m generating DAGs dynamically via Jinja templates based on 
user input. Each connection acts as a root, and I dynamically attach endpoints 
and response_check functions per task. So, binding response_check at the 
connection level isn’t ideal for this setup, and I'm still running into the 
same issue described here.
   > 
   > For anyone else facing this, here’s the workaround I’m using for now:
   > 
   > `class DeferredHttpPollingSensor(HttpSensor): """ Custom HttpSensor that 
defers the polling if the response is not ready. """
   > 
   > ```
   > def __init__(self, *args, **kwargs):
   >     """Initialize the DeferredHttpPollingSensor."""
   >     kwargs["deferrable"] = True
   >     super().__init__(*args, **kwargs)
   > 
   > def execute(self, context: Context):
   >     """Execute the sensor."""
   >     # Store the start time for timeout tracking
   >     context["ti"].xcom_push(
   >         key="deferred_http_polling_sensor_start", 
value=datetime.now().isoformat()
   >     )
   > 
   >     if self.poke(context):
   >         return True
   > 
   >     self.defer(
   >         
trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)),
   >         method_name="execute_complete",
   >     )
   > 
   > def execute_complete(self, context: Context, event=None):
   >     """Execute the complete sensor."""
   >     # Check timeout
   >     start_time_str = 
context["ti"].xcom_pull(key="deferred_http_polling_sensor_start")
   >     if start_time_str:
   >         start_time = datetime.fromisoformat(start_time_str)
   >         elapsed_time = (datetime.now() - start_time).total_seconds()
   > 
   >         if elapsed_time >= self.timeout:
   >             raise AirflowSensorTimeout(
   >                 f"Snap. Time is OUT. DAG: {context['dag'].dag_id}, "
   >                 f"Task: {context['task'].task_id}, "
   >                 f"Duration: {elapsed_time}s, "
   >                 f"Timeout: {self.timeout}s"
   >             )
   > 
   >     if self.poke(context):
   >         return True
   > 
   >     self.defer(
   >         
trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)),
   >         method_name="execute_complete",
   >     )`
   > ```
   Hello @k4rtikp4til, the issue is still open and my PR didn’t get merged yet, 
so to me it seems normal that you still run into this issue.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to