[ 
https://issues.apache.org/jira/browse/AIRFLOW-4297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822945#comment-16822945
 ] 

Xiaodong DENG commented on AIRFLOW-4297:
----------------------------------------

For our notes, an analysis of the code chunk which handles SLA miss records 
discovering and updating:

Comments starting with "!!!Xiaodong" are my analysis lines.

 
{code:java}
# !!!Xiaodong: This part will find the LATEST SUCCESS/SKIPPED task instances
# !!!Xiaodong: Due to this design, if there is no SUCCESS/SKIPPED task 
instances for the DAG,
# !!!Xiaodong: No SLA miss record can be discovered or created
TI = models.TaskInstance
sq = (
    session
    .query(
         TI.task_id,
         func.max(TI.execution_date).label('max_ti'))
    .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql')
    .filter(TI.dag_id == dag.dag_id)
    .filter(or_(
        TI.state == State.SUCCESS,
        TI.state == State.SKIPPED))
    .filter(TI.task_id.in_(dag.task_ids))
    .group_by(TI.task_id).subquery('sq')
)

max_tis = session.query(TI).filter(
    TI.dag_id == dag.dag_id,
    TI.task_id == sq.c.task_id,
    TI.execution_date == sq.c.max_ti,
).all()


ts = timezone.utcnow()
for ti in max_tis:
    task = dag.get_task(ti.task_id)

    dttm = ti.execution_date
    if isinstance(task.sla, timedelta):
        # !!!Xiaodong: for each latest SUCCESS/SKIPPED task, we can get its 
execution_date first,
        # !!!Xiaodong: but we check from its start_time, that's why we have a 
`dttm = dag.following_schedule(dttm)` here

        # !!!Xiaodong: A BIG issue here: dag.following_schedule() will return 
None
        # !!!Xiaodong: if the scheduler_interval of this DAG is None or "@once".
        # !!!Xiaodong: This is why the we have the bug reported in this ticket 
('None' can't be compared with a time).
        dttm = dag.following_schedule(dttm)
        while dttm < timezone.utcnow():
            following_schedule = dag.following_schedule(dttm)
            if following_schedule + task.sla < timezone.utcnow():
                session.merge(SlaMiss(
                    task_id=ti.task_id,
                    dag_id=ti.dag_id,
                    execution_date=dttm,
                    timestamp=ts))
            dttm = dag.following_schedule(dttm)
session.commit()
{code}
>From the chunk above, we can find that the SchedulerJob.manage_slas() is 
>purely written for DAGs with non-None/"@once" schedule_intervals. But does 
>come with a few essential flaws, like my first comment in the code above.

 

> Manually triggerd DAG with no schedule_interval breaks scheduler
> ----------------------------------------------------------------
>
>                 Key: AIRFLOW-4297
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-4297
>             Project: Apache Airflow
>          Issue Type: Bug
>    Affects Versions: 1.10.3
>            Reporter: Ash Berlin-Taylor
>            Assignee: Xiaodong DENG
>            Priority: Blocker
>             Fix For: 1.10.4
>
>
> {code:title=example_dag.py}
> timezone = "UTC"
> local_tz = pendulum.timezone(timezone)
> start_date = datetime.datetime.strptime('2019-03-28 07:57:00', "%Y-%m-%d 
> %H:%M:%S")
> start_date = start_date.replace(tzinfo=local_tz)
> default_args = {
> 'owner': 'DaniRC'
> }
> dag = DAG('testsla',
>     default_args=default_args,
>     start_date=start_date,
>     concurrency=1,
>     max_active_runs=1,
>     default_view='tree',
>     orientation='TB',
>     catchup=False,
>     schedule_interval=None
>     )
> {code}
> If this DAG is triggered, then this error occurs:
> {noformat}
> Process DagFileProcessor5303-Process:
> Traceback (most recent call last):
>   File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in 
> _bootstrap
>     self.run()
>   File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run
>     self._target(*self._args, **self._kwargs)
>   File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 402, in helper
>     pickle_dags)
>   File "/usr/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in 
> wrapper
>     return func(*args, **kwargs)
>   File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1760, in 
> process_file
>     self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1452, in 
> _process_dags
>     self.manage_slas(dag)
>   File "/usr/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in 
> wrapper
>     return func(*args, **kwargs)
>   File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 662, in 
> manage_slas
>     while dttm < timezone.utcnow():
> TypeError: '<' not supported between instances of 'NoneType' and 
> 'datetime.datetime'
> {noformat}
> After this happens the scheduler also won't schedule any further tasks for 
> _ANY_ dag.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to