dabla commented on issue #40209: URL: https://github.com/apache/airflow/issues/40209#issuecomment-3160909935
> Hey [@dabla](https://github.com/dabla) – I looked into your PR that was merged related to this issue. Curious to understand: what was the motivation for moving the response_check logic to the connection level? > > In my case, I’m generating DAGs dynamically via Jinja templates based on user input. Each connection acts as a root, and I dynamically attach endpoints and response_check functions per task. So, binding response_check at the connection level isn’t ideal for this setup, and I'm still running into the same issue described here. > > For anyone else facing this, here’s the workaround I’m using for now: > > `class DeferredHttpPollingSensor(HttpSensor): """ Custom HttpSensor that defers the polling if the response is not ready. """ > > ``` > def __init__(self, *args, **kwargs): > """Initialize the DeferredHttpPollingSensor.""" > kwargs["deferrable"] = True > super().__init__(*args, **kwargs) > > def execute(self, context: Context): > """Execute the sensor.""" > # Store the start time for timeout tracking > context["ti"].xcom_push( > key="deferred_http_polling_sensor_start", value=datetime.now().isoformat() > ) > > if self.poke(context): > return True > > self.defer( > trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)), > method_name="execute_complete", > ) > > def execute_complete(self, context: Context, event=None): > """Execute the complete sensor.""" > # Check timeout > start_time_str = context["ti"].xcom_pull(key="deferred_http_polling_sensor_start") > if start_time_str: > start_time = datetime.fromisoformat(start_time_str) > elapsed_time = (datetime.now() - start_time).total_seconds() > > if elapsed_time >= self.timeout: > raise AirflowSensorTimeout( > f"Snap. Time is OUT. DAG: {context['dag'].dag_id}, " > f"Task: {context['task'].task_id}, " > f"Duration: {elapsed_time}s, " > f"Timeout: {self.timeout}s" > ) > > if self.poke(context): > return True > > self.defer( > trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)), > method_name="execute_complete", > )` > ``` > Hey [@dabla](https://github.com/dabla) – I looked into your PR that was merged related to this issue. Curious to understand: what was the motivation for moving the response_check logic to the connection level? > > In my case, I’m generating DAGs dynamically via Jinja templates based on user input. Each connection acts as a root, and I dynamically attach endpoints and response_check functions per task. So, binding response_check at the connection level isn’t ideal for this setup, and I'm still running into the same issue described here. > > For anyone else facing this, here’s the workaround I’m using for now: > > `class DeferredHttpPollingSensor(HttpSensor): """ Custom HttpSensor that defers the polling if the response is not ready. """ > > ``` > def __init__(self, *args, **kwargs): > """Initialize the DeferredHttpPollingSensor.""" > kwargs["deferrable"] = True > super().__init__(*args, **kwargs) > > def execute(self, context: Context): > """Execute the sensor.""" > # Store the start time for timeout tracking > context["ti"].xcom_push( > key="deferred_http_polling_sensor_start", value=datetime.now().isoformat() > ) > > if self.poke(context): > return True > > self.defer( > trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)), > method_name="execute_complete", > ) > > def execute_complete(self, context: Context, event=None): > """Execute the complete sensor.""" > # Check timeout > start_time_str = context["ti"].xcom_pull(key="deferred_http_polling_sensor_start") > if start_time_str: > start_time = datetime.fromisoformat(start_time_str) > elapsed_time = (datetime.now() - start_time).total_seconds() > > if elapsed_time >= self.timeout: > raise AirflowSensorTimeout( > f"Snap. Time is OUT. DAG: {context['dag'].dag_id}, " > f"Task: {context['task'].task_id}, " > f"Duration: {elapsed_time}s, " > f"Timeout: {self.timeout}s" > ) > > if self.poke(context): > return True > > self.defer( > trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)), > method_name="execute_complete", > )` > ``` Hello @k4rtikp4til, the issue is still open and my PR didn’t get merged yet, so to me it seems normal that you still run into this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
