k4rtikp4til commented on issue #40209:
URL: https://github.com/apache/airflow/issues/40209#issuecomment-3158739953

   @dabla I looked at your PR that got merged, with regards to this issue what 
was the reason for going with check response to be defined at the connection 
level?
   
   I am creating the dag files based on user input using jinja and connection 
is just the root to which I dynamically add endpoints and check response 
functions. Thus, I am still running into this issue. For anyone who comes 
across the issue again, this is what I have ended up with for now:
   
   `class DeferredHttpPollingSensor(HttpSensor):
       """
       Custom HttpSensor that defers the polling if the response is not ready.
       """
   
       def __init__(self, *args, **kwargs):
           """Initialize the DeferredHttpPollingSensor."""
           kwargs["deferrable"] = True
           super().__init__(*args, **kwargs)
   
       def execute(self, context: Context):
           """Execute the sensor."""
           # Store the start time for timeout tracking
           context["ti"].xcom_push(
               key="deferred_http_polling_sensor_start", 
value=datetime.now().isoformat()
           )
   
           if self.poke(context):
               return True
   
           self.defer(
               
trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)),
               method_name="execute_complete",
           )
   
       def execute_complete(self, context: Context, event=None):
           """Execute the complete sensor."""
           # Check timeout
           start_time_str = 
context["ti"].xcom_pull(key="deferred_http_polling_sensor_start")
           if start_time_str:
               start_time = datetime.fromisoformat(start_time_str)
               elapsed_time = (datetime.now() - start_time).total_seconds()
   
               if elapsed_time >= self.timeout:
                   raise AirflowSensorTimeout(
                       f"Snap. Time is OUT. DAG: {context['dag'].dag_id}, "
                       f"Task: {context['task'].task_id}, "
                       f"Duration: {elapsed_time}s, "
                       f"Timeout: {self.timeout}s"
                   )
   
           if self.poke(context):
               return True
   
           self.defer(
               
trigger=TimeDeltaTrigger(delta=timedelta(seconds=self.poke_interval)),
               method_name="execute_complete",
           )`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to