[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541539#comment-16541539
 ] 

Stefan Seelmann edited comment on AIRFLOW-2747 at 7/12/18 8:27 PM:
-------------------------------------------------------------------

Screenshot of the Gantt view for an example DAG run:

  !Screenshot_2018-07-12_14-10-24.png!

And the corresponding rows in task_reschedule, task_fail, and task_instance 
table:
{noformat}
$ select * from task_reschedule where 
execution_date='2018-07-12T12:06:28.988028' order by id;
 id | task_id | dag_id |        execution_date         | try_number |          
start_date           |           end_date            | duration |        
reschedule_date        
----+---------+--------+-------------------------------+------------+-------------------------------+-------------------------------+----------+-------------------------------
 42 | s3      | dummy  | 2018-07-12 12:06:28.988028+00 |          1 | 
2018-07-12 12:06:54.430185+00 | 2018-07-12 12:06:59.339554+00 |        5 | 
2018-07-12 12:07:14.312456+00
 44 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          2 | 
2018-07-12 12:07:09.381193+00 | 2018-07-12 12:07:12.480702+00 |        3 | 
2018-07-12 12:07:22.467206+00
 45 | s3      | dummy  | 2018-07-12 12:06:28.988028+00 |          1 | 
2018-07-12 12:07:17.111816+00 | 2018-07-12 12:07:18.444199+00 |        1 | 
2018-07-12 12:07:33.4376+00
 47 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          3 | 
2018-07-12 12:07:34.499979+00 | 2018-07-12 12:07:35.834609+00 |        1 | 
2018-07-12 12:07:45.817533+00
 49 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          3 | 
2018-07-12 12:07:49.407569+00 | 2018-07-12 12:07:50.843526+00 |        1 | 
2018-07-12 12:08:00.834584+00
 51 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          4 | 
2018-07-12 12:08:14.526+00    | 2018-07-12 12:08:15.768907+00 |        1 | 
2018-07-12 12:08:25.762619+00
 53 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          4 | 
2018-07-12 12:08:29.329766+00 | 2018-07-12 12:08:31.168762+00 |        2 | 
2018-07-12 12:08:41.160209+00
{noformat}
{noformat}
$ select * from task_fail where execution_date='2018-07-12T12:06:28.988028' 
order by id;
 id  | task_id | dag_id |        execution_date         |          start_date   
        |           end_date            | duration 
-----+---------+--------+-------------------------------+-------------------------------+-------------------------------+----------
 173 | t1      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:06:33.005215+00 | 2018-07-12 12:06:36.503438+00 |        3
 179 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:06:54.860487+00 | 2018-07-12 12:06:59.352183+00 |        4
 181 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:07:25.124649+00 | 2018-07-12 12:07:26.606175+00 |        1
 182 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:08:04.295306+00 | 2018-07-12 12:08:05.610363+00 |        1
{noformat}
{noformat}
$ select 
task_id,dag_id,execution_date,start_date,end_date,duration,state,try_number 
from task_instance where dag_id='dummy' and 
execution_date='2018-07-12T12:06:28.988028';
 task_id | dag_id |        execution_date         |          start_date         
  |           end_date            | duration |  state  | try_number 
---------+--------+-------------------------------+-------------------------------+-------------------------------+----------+---------+------------
 s2      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:08:44.828189+00 | 2018-07-12 12:08:46.609474+00 | 1.781285 | success |       
   4
 t2      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:08:50.711506+00 | 2018-07-12 12:08:54.888104+00 | 4.176598 | success |       
   1
 b1      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:08:57.965998+00 | 2018-07-12 12:08:59.547209+00 | 1.581211 | success |       
   1
 t1      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:06:44.652687+00 | 2018-07-12 12:06:48.328103+00 | 3.675416 | success |       
   2
 sub1    | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:09:03.322963+00 | 2018-07-12 12:09:40.248113+00 | 36.92515 | success |       
   1
 s1      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:06:54.345113+00 | 2018-07-12 12:06:58.871657+00 | 4.526544 | success |       
   1
 s3      | dummy  | 2018-07-12 12:06:28.988028+00 | 2018-07-12 
12:07:37.190335+00 | 2018-07-12 12:07:38.725783+00 | 1.535448 | success |       
   1
{noformat}
 

 


was (Author: seelmann):
Screenshot of the Gantt view for an example DAG run:

  !Screenshot_2018-07-12_14-10-24.png!


 And the corresponding rows in task_reschedule table:
{noformat}
$ select * from task_reschedule where 
execution_date='2018-07-12T12:06:28.988028' order by id;
 id | task_id | dag_id |        execution_date         | try_number |          
start_date           |           end_date            | duration |        
reschedule_date        
----+---------+--------+-------------------------------+------------+-------------------------------+-------------------------------+----------+-------------------------------
 42 | s3      | dummy  | 2018-07-12 12:06:28.988028+00 |          1 | 
2018-07-12 12:06:54.430185+00 | 2018-07-12 12:06:59.339554+00 |        5 | 
2018-07-12 12:07:14.312456+00
 44 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          2 | 
2018-07-12 12:07:09.381193+00 | 2018-07-12 12:07:12.480702+00 |        3 | 
2018-07-12 12:07:22.467206+00
 45 | s3      | dummy  | 2018-07-12 12:06:28.988028+00 |          1 | 
2018-07-12 12:07:17.111816+00 | 2018-07-12 12:07:18.444199+00 |        1 | 
2018-07-12 12:07:33.4376+00
 47 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          3 | 
2018-07-12 12:07:34.499979+00 | 2018-07-12 12:07:35.834609+00 |        1 | 
2018-07-12 12:07:45.817533+00
 49 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          3 | 
2018-07-12 12:07:49.407569+00 | 2018-07-12 12:07:50.843526+00 |        1 | 
2018-07-12 12:08:00.834584+00
 51 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          4 | 
2018-07-12 12:08:14.526+00    | 2018-07-12 12:08:15.768907+00 |        1 | 
2018-07-12 12:08:25.762619+00
 53 | s2      | dummy  | 2018-07-12 12:06:28.988028+00 |          4 | 
2018-07-12 12:08:29.329766+00 | 2018-07-12 12:08:31.168762+00 |        2 | 
2018-07-12 12:08:41.160209+00
{noformat}

> Explicit re-schedule of sensors
> -------------------------------
>
>                 Key: AIRFLOW-2747
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: core, operators
>    Affects Versions: 1.9.0
>            Reporter: Stefan Seelmann
>            Assignee: Stefan Seelmann
>            Priority: Major
>             Fix For: 2.0.0
>
>         Attachments: Screenshot_2018-07-12_14-10-24.png
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors raised by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to