[ 
https://issues.apache.org/jira/browse/AIRFLOW-3293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16676783#comment-16676783
 ] 

Ash Berlin-Taylor commented on AIRFLOW-3293:
--------------------------------------------

AIRFLOW-2747 and AIRFLOW-850 would help with your second point. And as of the 
current release even if the sensor behaved how you wanted it would still take 
up an executor slot as that is how sensors work.

I am uncertain on if this is a common enough use case to support directly given 
the two tickets mentioned above.

> Rename TimeDeltaSensor to ScheduleTimeDeltaSensor
> -------------------------------------------------
>
>                 Key: AIRFLOW-3293
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3293
>             Project: Apache Airflow
>          Issue Type: Wish
>            Reporter: Darren Weber
>            Priority: Major
>
> The TimeDeltaSensor has baked-in lookups for the schedule and 
> schedule_interval lurking in the class init, it's not a pure time delta.  It 
> would be ideal to have a TimeDelta that is purely relative to the time that 
> an upstream task triggers it.  If there is a way to do this, please note it 
> here or suggest some implementation alternative that could achieve this 
> easily.
> The implementation below using a PythonOperator works, but it consumes a 
> worker for 5min needlessly.  It would be much better to have a TimeDelta that 
> accepts the time when an upstream sensor triggers it and then waits for a 
> timedelta, with options from the base sensor for poke interval (and timeout). 
>  This could be used without consuming a worker as much with the reschedule 
> option.  Something like this can help with adding jitter to downstream tasks 
> that could otherwise hit an HTTP endpoint too hard all at once.
> {code:python}
> def wait5(*args, **kwargs):
>     import random
>     import time as t
>     minutes = random.randint(3,6)
>     t.sleep(minutes * 60)
>     return True
> wait5_task = PythonOperator(
>             task_id="python_op_wait_5min",
>             python_callable=wait5,
>             dag=a_dag)
> upstream_http_sensor >> wait5_task
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to