[ https://issues.apache.org/jira/browse/AIRFLOW-4297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16822945#comment-16822945 ]
Xiaodong DENG commented on AIRFLOW-4297: ---------------------------------------- For our notes, an analysis of the code chunk which handles SLA miss records discovering and updating: Comments starting with "!!!Xiaodong" are my analysis lines. {code:java} # !!!Xiaodong: This part will find the LATEST SUCCESS/SKIPPED task instances # !!!Xiaodong: Due to this design, if there is no SUCCESS/SKIPPED task instances for the DAG, # !!!Xiaodong: No SLA miss record can be discovered or created TI = models.TaskInstance sq = ( session .query( TI.task_id, func.max(TI.execution_date).label('max_ti')) .with_hint(TI, 'USE INDEX (PRIMARY)', dialect_name='mysql') .filter(TI.dag_id == dag.dag_id) .filter(or_( TI.state == State.SUCCESS, TI.state == State.SKIPPED)) .filter(TI.task_id.in_(dag.task_ids)) .group_by(TI.task_id).subquery('sq') ) max_tis = session.query(TI).filter( TI.dag_id == dag.dag_id, TI.task_id == sq.c.task_id, TI.execution_date == sq.c.max_ti, ).all() ts = timezone.utcnow() for ti in max_tis: task = dag.get_task(ti.task_id) dttm = ti.execution_date if isinstance(task.sla, timedelta): # !!!Xiaodong: for each latest SUCCESS/SKIPPED task, we can get its execution_date first, # !!!Xiaodong: but we check from its start_time, that's why we have a `dttm = dag.following_schedule(dttm)` here # !!!Xiaodong: A BIG issue here: dag.following_schedule() will return None # !!!Xiaodong: if the scheduler_interval of this DAG is None or "@once". # !!!Xiaodong: This is why the we have the bug reported in this ticket ('None' can't be compared with a time). dttm = dag.following_schedule(dttm) while dttm < timezone.utcnow(): following_schedule = dag.following_schedule(dttm) if following_schedule + task.sla < timezone.utcnow(): session.merge(SlaMiss( task_id=ti.task_id, dag_id=ti.dag_id, execution_date=dttm, timestamp=ts)) dttm = dag.following_schedule(dttm) session.commit() {code} >From the chunk above, we can find that the SchedulerJob.manage_slas() is >purely written for DAGs with non-None/"@once" schedule_intervals. But does >come with a few essential flaws, like my first comment in the code above. > Manually triggerd DAG with no schedule_interval breaks scheduler > ---------------------------------------------------------------- > > Key: AIRFLOW-4297 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4297 > Project: Apache Airflow > Issue Type: Bug > Affects Versions: 1.10.3 > Reporter: Ash Berlin-Taylor > Assignee: Xiaodong DENG > Priority: Blocker > Fix For: 1.10.4 > > > {code:title=example_dag.py} > timezone = "UTC" > local_tz = pendulum.timezone(timezone) > start_date = datetime.datetime.strptime('2019-03-28 07:57:00', "%Y-%m-%d > %H:%M:%S") > start_date = start_date.replace(tzinfo=local_tz) > default_args = { > 'owner': 'DaniRC' > } > dag = DAG('testsla', > default_args=default_args, > start_date=start_date, > concurrency=1, > max_active_runs=1, > default_view='tree', > orientation='TB', > catchup=False, > schedule_interval=None > ) > {code} > If this DAG is triggered, then this error occurs: > {noformat} > Process DagFileProcessor5303-Process: > Traceback (most recent call last): > File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in > _bootstrap > self.run() > File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run > self._target(*self._args, **self._kwargs) > File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 402, in helper > pickle_dags) > File "/usr/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in > wrapper > return func(*args, **kwargs) > File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1760, in > process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 1452, in > _process_dags > self.manage_slas(dag) > File "/usr/lib/python3.6/site-packages/airflow/utils/db.py", line 73, in > wrapper > return func(*args, **kwargs) > File "/usr/lib/python3.6/site-packages/airflow/jobs.py", line 662, in > manage_slas > while dttm < timezone.utcnow(): > TypeError: '<' not supported between instances of 'NoneType' and > 'datetime.datetime' > {noformat} > After this happens the scheduler also won't schedule any further tasks for > _ANY_ dag. -- This message was sent by Atlassian JIRA (v7.6.3#76005)