[jira] [Updated] (AIRFLOW-647) Restore dag.get_active_runs(), without the DB updates
[ https://issues.apache.org/jira/browse/AIRFLOW-647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-647: Description: "AIRFLOW-128 Optimize and refactor process_dag" removed dag.get_active_runs(), as it both returned a list of execution dates of a dags running dags, while also validated and updating the states in the db. This was done by splitting the getter into two methods (update_state and verify_integrity), However, neither returns the list of execution dates, which have valid uses (we leverage them in maintenance DAGs). This bug fix PR will restore the getter and just return a list of execution dates, which will fix DAGs that relied on the list of dates returned. was: "AIRFLOW-128 Optimize and refactor process_dag" removed dag.get_active_runs(), as it both returned a list of execution dates of a dags running dags, while also validated and updating the states in the db. This was done by splitting the getter into two methods (update_state and verify_integrity), However, neither returns the list of execution dates, which have valid uses (we leverage them in maintenance DAGs). This bug fix PR will restore the getter and just return a list of execution dates. > Restore dag.get_active_runs(), without the DB updates > - > > Key: AIRFLOW-647 > URL: https://issues.apache.org/jira/browse/AIRFLOW-647 > Project: Apache Airflow > Issue Type: Bug > Components: models >Affects Versions: Airflow 1.8 >Reporter: Ben Tallman >Assignee: Ben Tallman > Fix For: Airflow 1.8 > > > "AIRFLOW-128 Optimize and refactor process_dag" removed > dag.get_active_runs(), as it both returned a list of execution dates of a > dags running dags, while also validated and updating the states in the db. > This was done by splitting the getter into two methods (update_state and > verify_integrity), However, neither returns the list of execution dates, > which have valid uses (we leverage them in maintenance DAGs). > This bug fix PR will restore the getter and just return a list of execution > dates, which will fix DAGs that relied on the list of dates returned. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-627) Tasks getting Queued when Pool is full sometimes never run
[ https://issues.apache.org/jira/browse/AIRFLOW-627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-627: Description: Log data when this happens: [2016-11-14 10:54:04,174] {models.py:168} INFO - Filling up the DagBag from /opt/efs/airflow/dags/crawl_traffic_prod.py [2016-11-14 10:54:07,562] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:07,667] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:27,214] {models.py:168} INFO - Filling up the DagBag from /opt/efs/airflow/dags/crawl_traffic_prod.py [2016-11-14 10:54:30,150] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:30,311] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:32,438] {models.py:1072} INFO - Dependencies all met for [2016-11-14 10:54:32,700] {models.py:1069} WARNING - Dependencies not met for , dependency 'DAG's Pool Has Space' FAILED: Task's pool 'prod_pod_crawler' is full. depends_on_past False depsset([, , ]) was: [2016-11-14 10:54:04,174] {models.py:168} INFO - Filling up the DagBag from /opt/efs/airflow/dags/crawl_traffic_prod.py [2016-11-14 10:54:07,562] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:07,667] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:27,214] {models.py:168} INFO - Filling up the DagBag from /opt/efs/airflow/dags/crawl_traffic_prod.py [2016-11-14 10:54:30,150] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:30,311] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:32,438] {models.py:1072} INFO - Dependencies all met for [2016-11-14 10:54:32,700] {models.py:1069} WARNING - Dependencies not met for , dependency 'DAG's Pool Has Space' FAILED: Task's pool 'prod_pod_crawler' is full. > Tasks getting Queued when Pool is full sometimes never run > -- > > Key: AIRFLOW-627 > URL: https://issues.apache.org/jira/browse/AIRFLOW-627 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: Airflow 1.8 > Environment: Celery Executor, Master Branch, Postgres >Reporter: Ben Tallman > > Log data when this happens: > [2016-11-14 10:54:04,174] {models.py:168} INFO - Filling up the DagBag from > /opt/efs/airflow/dags/crawl_traffic_prod.py > [2016-11-14 10:54:07,562] {base_hook.py:67} INFO - Using connection to: > db.xyz.com > [2016-11-14 10:54:07,667] {base_hook.py:67} INFO - Using connection to: > db.xyz.com > [2016-11-14 10:54:27,214] {models.py:168} INFO - Filling up the DagBag from > /opt/efs/airflow/dags/crawl_traffic_prod.py > [2016-11-14 10:54:30,150] {base_hook.py:67} INFO - Using connection to: > db.xyz.com > [2016-11-14 10:54:30,311] {base_hook.py:67} INFO - Using connection to: > db.xyz.com > [2016-11-14 10:54:32,438] {models.py:1072} INFO - Dependencies all met for > 01:00:00 [queued]> > [2016-11-14 10:54:32,700] {models.py:1069} WARNING - Dependencies not met for > 01:00:00 [queued]>, dependency 'DAG's Pool Has Space' FAILED: Task's pool > 'prod_pod_crawler' is full. > depends_on_past False > deps set([ , , > ]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-627) Tasks getting Queued when Pool is full sometimes never run
[ https://issues.apache.org/jira/browse/AIRFLOW-627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-627: Description: Log data when this happens: [2016-11-14 10:54:04,174] {models.py:168} INFO - Filling up the DagBag from /opt/efs/airflow/dags/crawl_traffic_prod.py [2016-11-14 10:54:07,562] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:07,667] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:27,214] {models.py:168} INFO - Filling up the DagBag from /opt/efs/airflow/dags/crawl_traffic_prod.py [2016-11-14 10:54:30,150] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:30,311] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:32,438] {models.py:1072} INFO - Dependencies all met for [2016-11-14 10:54:32,700] {models.py:1069} WARNING - Dependencies not met for , dependency 'DAG's Pool Has Space' FAILED: Task's pool 'prod_pod_crawler' is full. Task Details info: depends_on_past False depsset([, , ]) was: Log data when this happens: [2016-11-14 10:54:04,174] {models.py:168} INFO - Filling up the DagBag from /opt/efs/airflow/dags/crawl_traffic_prod.py [2016-11-14 10:54:07,562] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:07,667] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:27,214] {models.py:168} INFO - Filling up the DagBag from /opt/efs/airflow/dags/crawl_traffic_prod.py [2016-11-14 10:54:30,150] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:30,311] {base_hook.py:67} INFO - Using connection to: db.xyz.com [2016-11-14 10:54:32,438] {models.py:1072} INFO - Dependencies all met for [2016-11-14 10:54:32,700] {models.py:1069} WARNING - Dependencies not met for , dependency 'DAG's Pool Has Space' FAILED: Task's pool 'prod_pod_crawler' is full. depends_on_past False depsset([ , , ]) > Tasks getting Queued when Pool is full sometimes never run > -- > > Key: AIRFLOW-627 > URL: https://issues.apache.org/jira/browse/AIRFLOW-627 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: Airflow 1.8 > Environment: Celery Executor, Master Branch, Postgres >Reporter: Ben Tallman > > Log data when this happens: > [2016-11-14 10:54:04,174] {models.py:168} INFO - Filling up the DagBag from > /opt/efs/airflow/dags/crawl_traffic_prod.py > [2016-11-14 10:54:07,562] {base_hook.py:67} INFO - Using connection to: > db.xyz.com > [2016-11-14 10:54:07,667] {base_hook.py:67} INFO - Using connection to: > db.xyz.com > [2016-11-14 10:54:27,214] {models.py:168} INFO - Filling up the DagBag from > /opt/efs/airflow/dags/crawl_traffic_prod.py > [2016-11-14 10:54:30,150] {base_hook.py:67} INFO - Using connection to: > db.xyz.com > [2016-11-14 10:54:30,311] {base_hook.py:67} INFO - Using connection to: > db.xyz.com > [2016-11-14 10:54:32,438] {models.py:1072} INFO - Dependencies all met for > 01:00:00 [queued]> > [2016-11-14 10:54:32,700] {models.py:1069} WARNING - Dependencies not met for > 01:00:00 [queued]>, dependency 'DAG's Pool Has Space' FAILED: Task's pool > 'prod_pod_crawler' is full. > Task Details info: > depends_on_past False > deps set([ , , > ]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-464) Add setdefault method to Variable Object
[ https://issues.apache.org/jira/browse/AIRFLOW-464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-464: Description: In order to assist with environment migrations, we added a setdefault method to the Variable object. This allows default variables to be created (and then edited) with less chance for typos/copy+paste bugs. Variable.setdefault(key, default, deserialize_json=[True|False]) returns either the value stored in Variable(key) or sets Variable(key) = default and returns default. was: In order to assist with environment migrations, add a create_if_none option to the Variable.get function. This allows default variables to be created (and then edited) with less chance for typos/copy+paste bugs. > Add setdefault method to Variable Object > > > Key: AIRFLOW-464 > URL: https://issues.apache.org/jira/browse/AIRFLOW-464 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Ben Tallman >Assignee: Ben Tallman >Priority: Trivial > > In order to assist with environment migrations, we added a setdefault method > to > the Variable object. This allows default variables to be created (and then > edited) with less chance for typos/copy+paste bugs. > Variable.setdefault(key, default, deserialize_json=[True|False]) returns > either the > value stored in Variable(key) or sets Variable(key) = default and returns > default. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-464) Add setdefault method to Variable Object
[ https://issues.apache.org/jira/browse/AIRFLOW-464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-464: Summary: Add setdefault method to Variable Object (was: Add create_if_none option to Variable.get) > Add setdefault method to Variable Object > > > Key: AIRFLOW-464 > URL: https://issues.apache.org/jira/browse/AIRFLOW-464 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Ben Tallman >Assignee: Ben Tallman >Priority: Trivial > > In order to assist with environment migrations, add a create_if_none option > to the Variable.get function. This allows default variables to be created > (and then > edited) with less chance for typos/copy+paste bugs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-582) ti.get_dagrun() should not filter on start_date
[ https://issues.apache.org/jira/browse/AIRFLOW-582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15614396#comment-15614396 ] Ben Tallman commented on AIRFLOW-582: - dr = session.query(DagRun).filter( DagRun.dag_id == self.dag_id, DagRun.execution_date == self.execution_date, DagRun.start_date == self.start_date ).first() SHOULD INSTEAD BE dr = session.query(DagRun).filter( DagRun.dag_id == self.dag_id, DagRun.execution_date == self.execution_date ).first() > ti.get_dagrun() should not filter on start_date > --- > > Key: AIRFLOW-582 > URL: https://issues.apache.org/jira/browse/AIRFLOW-582 > Project: Apache Airflow > Issue Type: Bug >Reporter: Ben Tallman > > The filter in ti.get_dagrun should not include start_date, it should only > search on execution_date. > @provide_session > def get_dagrun(self, session): > """ > Returns the DagRun for this TaskInstance > :param session: > :return: DagRun > """ > dr = session.query(DagRun).filter( > DagRun.dag_id == self.dag_id, > DagRun.execution_date == self.execution_date, > DagRun.start_date == self.start_date > ).first() > return dr -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-385) Scheduler logs should have a "latest" directory
[ https://issues.apache.org/jira/browse/AIRFLOW-385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15589859#comment-15589859 ] Ben Tallman commented on AIRFLOW-385: - Not sure I'm worried about the latest flag, but moving to dag_id/execution_date/task_id.log would be very helpful... > Scheduler logs should have a "latest" directory > --- > > Key: AIRFLOW-385 > URL: https://issues.apache.org/jira/browse/AIRFLOW-385 > Project: Apache Airflow > Issue Type: Improvement > Components: logging >Reporter: Dan Davydov >Assignee: Vijay Bhat >Priority: Minor > Labels: beginner, logging > > Right now the scheduler logs for each parsed file look like > /tmp/airflow/scheduler/logs/2016-07-17/some-dag/... > The problem is this makes it hard to find the latest logs which is the most > frequent use case. > Ideally we would create and have the scheduler keep up to date a symlink to > the latest date /tmp/airflow/scheduler/logs/latest which would point to e.g. > /tmp/airflow/scheduler/logs/2016-07-17 > We might also want to consider changing the structure to dag/date instead of > date/dag too, but that can be done as a separate task if desired. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-582) ti.get_dagrun() should not filter on start_date
Ben Tallman created AIRFLOW-582: --- Summary: ti.get_dagrun() should not filter on start_date Key: AIRFLOW-582 URL: https://issues.apache.org/jira/browse/AIRFLOW-582 Project: Apache Airflow Issue Type: Bug Reporter: Ben Tallman The filter in ti.get_dagrun should not include start_date, it should only search on execution_date. @provide_session def get_dagrun(self, session): """ Returns the DagRun for this TaskInstance :param session: :return: DagRun """ dr = session.query(DagRun).filter( DagRun.dag_id == self.dag_id, DagRun.execution_date == self.execution_date, DagRun.start_date == self.start_date ).first() return dr -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-510) Filter Paused Dags, add Last Run Column, add Trigger Dag Icon
[ https://issues.apache.org/jira/browse/AIRFLOW-510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-510: Description: Modify the HomeView to filter out paused dags if wanted and display the last run date time on each dag, as well as allow externally triggering the dag from an icon was:Hide "paused" DAGs from the HomeView and provide a link to show them > Filter Paused Dags, add Last Run Column, add Trigger Dag Icon > - > > Key: AIRFLOW-510 > URL: https://issues.apache.org/jira/browse/AIRFLOW-510 > Project: Apache Airflow > Issue Type: Improvement > Components: ui >Reporter: Ben Tallman >Assignee: Ben Tallman >Priority: Minor > > Modify the HomeView to filter out paused dags if wanted and display the last > run date time on each dag, as well as allow externally triggering the dag > from an icon -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-510) Filter Paused Dags, add Last Run Column, add Trigger Dag Icon
[ https://issues.apache.org/jira/browse/AIRFLOW-510?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-510: Summary: Filter Paused Dags, add Last Run Column, add Trigger Dag Icon (was: Hide "paused" DAGs) > Filter Paused Dags, add Last Run Column, add Trigger Dag Icon > - > > Key: AIRFLOW-510 > URL: https://issues.apache.org/jira/browse/AIRFLOW-510 > Project: Apache Airflow > Issue Type: Improvement > Components: ui >Reporter: Ben Tallman >Assignee: Ben Tallman >Priority: Minor > > Hide "paused" DAGs from the HomeView and provide a link to show them -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (AIRFLOW-558) Add Support for dag.backfill=(True|False) Option
[ https://issues.apache.org/jira/browse/AIRFLOW-558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman reassigned AIRFLOW-558: --- Assignee: Ben Tallman > Add Support for dag.backfill=(True|False) Option > > > Key: AIRFLOW-558 > URL: https://issues.apache.org/jira/browse/AIRFLOW-558 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Reporter: Ben Tallman >Assignee: Ben Tallman > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-558) Add Support for dag.backfill=(True|False) Option
[ https://issues.apache.org/jira/browse/AIRFLOW-558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-558: Priority: Major (was: Critical) > Add Support for dag.backfill=(True|False) Option > > > Key: AIRFLOW-558 > URL: https://issues.apache.org/jira/browse/AIRFLOW-558 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Reporter: Ben Tallman > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-558) Add Support for dag.backfill=(True|False) Option
[ https://issues.apache.org/jira/browse/AIRFLOW-558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-558: Issue Type: Improvement (was: Bug) > Add Support for dag.backfill=(True|False) Option > > > Key: AIRFLOW-558 > URL: https://issues.apache.org/jira/browse/AIRFLOW-558 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Reporter: Ben Tallman >Priority: Critical > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-558) Add Support for dag.backfill=(True|False) Option
Ben Tallman created AIRFLOW-558: --- Summary: Add Support for dag.backfill=(True|False) Option Key: AIRFLOW-558 URL: https://issues.apache.org/jira/browse/AIRFLOW-558 Project: Apache Airflow Issue Type: Bug Components: core Reporter: Ben Tallman Priority: Critical -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-464) Add create_if_none option to Variable.get
[ https://issues.apache.org/jira/browse/AIRFLOW-464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15453003#comment-15453003 ] Ben Tallman commented on AIRFLOW-464: - By definition, these are not constants, being called variables. We use them as configuration items, and as such are mostly owned by a single DAG. Variables are there or not for many reasons though, and in our environment, we use this to ensure that a new DAG can self configure cleanly. Basically, we see the default_var as the real issue, as it creates very opaque config options. Using this, we are able to surface those and they are then both available and obvious for editing. > Add create_if_none option to Variable.get > - > > Key: AIRFLOW-464 > URL: https://issues.apache.org/jira/browse/AIRFLOW-464 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Ben Tallman >Assignee: Ben Tallman >Priority: Trivial > > In order to assist with environment migrations, add a create_if_none option > to the Variable.get function. This allows default variables to be created > (and then > edited) with less chance for typos/copy+paste bugs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-464) Add create_if_none option to Variable.get
[ https://issues.apache.org/jira/browse/AIRFLOW-464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ben Tallman updated AIRFLOW-464: Description: In order to assist with environment migrations, add a create_if_none option to the Variable.get function. This allows default variables to be created (and then edited) with less chance for typos/copy+paste bugs. > Add create_if_none option to Variable.get > - > > Key: AIRFLOW-464 > URL: https://issues.apache.org/jira/browse/AIRFLOW-464 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Ben Tallman >Assignee: Ben Tallman >Priority: Trivial > > In order to assist with environment migrations, add a create_if_none option > to the Variable.get function. This allows default variables to be created > (and then > edited) with less chance for typos/copy+paste bugs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-464) Add create_if_none option to Variable.get
Ben Tallman created AIRFLOW-464: --- Summary: Add create_if_none option to Variable.get Key: AIRFLOW-464 URL: https://issues.apache.org/jira/browse/AIRFLOW-464 Project: Apache Airflow Issue Type: Improvement Reporter: Ben Tallman Assignee: Ben Tallman Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-448) Add Apigee as an official user
Ben Tallman created AIRFLOW-448: --- Summary: Add Apigee as an official user Key: AIRFLOW-448 URL: https://issues.apache.org/jira/browse/AIRFLOW-448 Project: Apache Airflow Issue Type: Task Reporter: Ben Tallman Assignee: Ben Tallman Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-249) Refactor the SLA mechanism
[ https://issues.apache.org/jira/browse/AIRFLOW-249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15343075#comment-15343075 ] Ben Tallman commented on AIRFLOW-249: - My only comment, looking at the PR comments, is that SLA is poorly defined to begin with. We set it on a per task level, but don't really discuss the differences between an execution timeout and an SLA timeout. Not sure, with the attribute being on each task, how this can be handled anywhere but at the task level. However, I believe that a DAG level SLA is more important. > Refactor the SLA mechanism > -- > > Key: AIRFLOW-249 > URL: https://issues.apache.org/jira/browse/AIRFLOW-249 > Project: Apache Airflow > Issue Type: Improvement >Reporter: dud >Assignee: dud > > Hello > I've noticed the SLA feature is currently behaving as follow : > - it doesn't work on DAG scheduled @once or None because they have no > dag.followwing_schedule property > - it keeps endlessly checking for SLA misses without ever worrying about any > end_date. Worse I noticed that emails are still being sent for runs that are > never happening because of end_date > - it keeps checking for recent TIs even if SLA notification has been already > been sent for them > - the SLA logic is only being fired after following_schedule + sla has > elapsed, in other words one has to wait for the next TI before having a > chance of getting any email. Also the email reports dag.following_schedule > time (I guess because it is close of TI.start_date), but unfortunately that > doesn't match what the task instances shows nor the log filename > - the SLA logic is based on max(TI.execution_date) for the starting point of > its checks, that means that for a DAG whose SLA is longer than its schedule > period if half of the TIs are running longer than expected it will go > unnoticed. This could be demonstrated with a DAG like this one : > {code} > from airflow import DAG > from airflow.operators import * > from datetime import datetime, timedelta > from time import sleep > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2016, 6, 16, 12, 20), > 'email': my_email > 'sla': timedelta(minutes=2), > } > dag = DAG('unnoticed_sla', default_args=default_args, > schedule_interval=timedelta(minutes=1)) > def alternating_sleep(**kwargs): > minute = kwargs['execution_date'].strftime("%M") > is_odd = int(minute) % 2 > if is_odd: > sleep(300) > else: > sleep(10) > return True > PythonOperator( > task_id='sla_miss', > python_callable=alternating_sleep, > provide_context=True, > dag=dag) > {code} > I've tried to rework the SLA triggering mechanism by addressing the above > points., please [have a look on > it|https://github.com/dud225/incubator-airflow/commit/972260354075683a8d55a1c960d839c37e629e7d] > I made some tests with this patch : > - the fluctuent DAG shown above no longer make Airflow skip any SLA event : > {code} > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --+---+-+++-+--- > sla_miss | dag_sla_miss1 | 2016-06-16 15:05:00 | t | 2016-06-16 > 15:08:26.058631 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:07:00 | t | 2016-06-16 > 15:10:06.093253 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:09:00 | t | 2016-06-16 > 15:12:06.241773 | | t > {code} > - on a normal DAG, the SLA is being triggred more quickly : > {code} > // start_date = 2016-06-16 15:55:00 > // end_date = 2016-06-16 16:00:00 > // schedule_interval = timedelta(minutes=1) > // sla = timedelta(minutes=2) > task_id |dag_id | execution_date| email_sent | > timestamp | description | notification_sent > --+---+-+++-+--- > sla_miss | dag_sla_miss1 | 2016-06-16 15:55:00 | t | 2016-06-16 > 15:58:11.832299 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:56:00 | t | 2016-06-16 > 15:59:09.663778 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:57:00 | t | 2016-06-16 > 16:00:13.651422 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:58:00 | t | 2016-06-16 > 16:01:08.576399 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 15:59:00 | t | 2016-06-16 > 16:02:08.523486 | | t > sla_miss | dag_sla_miss1 | 2016-06-16 16:00:00 | t | 2016-06-16 > 16:03:08.538593 | | t > (6 rows) > {code} > than before (current master branch) :