[jira] [Issue Comment Deleted] (AIRFLOW-1156) Using a timedelta object as a Schedule Interval with catchup=False causes the start_date to no longer be honored.
[ https://issues.apache.org/jira/browse/AIRFLOW-1156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antti Ruokomäki updated AIRFLOW-1156: - Comment: was deleted (was: Additionally, this seems to occur with a regular cron expression, "0 21 * * *") > Using a timedelta object as a Schedule Interval with catchup=False causes the > start_date to no longer be honored. > - > > Key: AIRFLOW-1156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1156 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: Airflow 1.8 >Reporter: Zachary Lawson >Priority: Minor > > Currently, in Airflow v1.8, if you set your schedule_interval to a timedelta > object and set catchup=False, the start_date is no longer honored and the DAG > is scheduled immediately upon unpausing the DAG. It is then schedule on the > schedule interval from that point onward. Example below: > {code} > from airflow import DAG > from datetime import datetime, timedelta > import logging > from airflow.operators.python_operator import PythonOperator > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2015, 6, 1), > } > dag = DAG('test', default_args=default_args, > schedule_interval=timedelta(seconds=5), catchup=False) > def context_test(ds, **context): > logging.info('testing') > test_context = PythonOperator( > task_id='test_context', > provide_context=True, > python_callable=context_test, > dag=dag > ) > {code} > If you switch the above over to a CRON expression, the behavior of the > scheduling is returned to the expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2169) Fail to discern between VARBINARY and VARCHAR in MySQL
[ https://issues.apache.org/jira/browse/AIRFLOW-2169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426378#comment-16426378 ] ASF subversion and git services commented on AIRFLOW-2169: -- Commit 4c89e440ef34b43142ef7f61a2fb6424dfc7f00f in incubator-airflow's branch refs/heads/master from [~whynick1] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4c89e44 ] [AIRFLOW-2169] Fix type 'bytes' is not JSON serializable in python3 > Fail to discern between VARBINARY and VARCHAR in MySQL > -- > > Key: AIRFLOW-2169 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2169 > Project: Apache Airflow > Issue Type: Bug > Components: db, operators >Reporter: Hongyi Wang >Assignee: Hongyi Wang >Priority: Major > Fix For: 1.10.0 > > > Current MySqlToGoogleCloudStorageOperator has difficulty to discern between > VARBINARY and VARCHAR in MySQL (and other similar fields–CHAR/BINARY, etc). > While "binary-related" MySQL data types, like VARBINARY, should be mapped to > "BYTES" in Google Cloud Storage, rather than "STRING". -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[1/2] incubator-airflow git commit: [AIRFLOW-2169] Fix type 'bytes' is not JSON serializable in python3
Repository: incubator-airflow Updated Branches: refs/heads/master 9c0c4264c -> f865c7898 [AIRFLOW-2169] Fix type 'bytes' is not JSON serializable in python3 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4c89e440 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4c89e440 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4c89e440 Branch: refs/heads/master Commit: 4c89e440ef34b43142ef7f61a2fb6424dfc7f00f Parents: d1f94fe Author: Hongyi WangAuthored: Tue Apr 3 18:10:42 2018 -0700 Committer: Hongyi Wang Committed: Tue Apr 3 18:10:42 2018 -0700 -- airflow/contrib/operators/mysql_to_gcs.py | 26 ++- .../operators/test_mysql_to_gcs_operator.py | 47 2 files changed, 63 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c89e440/airflow/contrib/operators/mysql_to_gcs.py -- diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index 2249b9c..4e238ca 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -24,7 +24,7 @@ from datetime import date, datetime from decimal import Decimal from MySQLdb.constants import FIELD_TYPE from tempfile import NamedTemporaryFile -from six import string_types +from six import string_types, binary_type PY3 = sys.version_info[0] == 3 @@ -130,6 +130,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): names in GCS, and values are file handles to local files that contain the data for the GCS objects. """ +class BinaryTypeEncoder(json.JSONEncoder): +def default(self, obj): +if PY3 and isinstance(obj, binary_type): +return str(obj, 'utf-8') +return json.JSONEncoder.default(self, obj) + schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) file_no = 0 tmp_file_handle = NamedTemporaryFile(delete=True) @@ -141,7 +147,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): row_dict = dict(zip(schema, row)) # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB. -s = json.dumps(row_dict) +s = json.dumps(row_dict, cls=BinaryTypeEncoder) if PY3: s = s.encode('utf-8') tmp_file_handle.write(s) @@ -166,12 +172,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): name in GCS, and values are file handles to local files that contains the BigQuery schema fields in .json format. """ -schema = [] +schema_str = None tmp_schema_file_handle = NamedTemporaryFile(delete=True) if self.schema is not None and isinstance(self.schema, string_types): -schema = self.schema -tmp_schema_file_handle.write(schema) +schema_str = self.schema else: +schema = [] if self.schema is not None and isinstance(self.schema, list): schema = self.schema else: @@ -191,12 +197,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): 'type': field_type, 'mode': field_mode, }) -s = json.dumps(schema, tmp_schema_file_handle) -if PY3: -s = s.encode('utf-8') -tmp_schema_file_handle.write(s) +schema_str = json.dumps(schema) +if PY3: +schema_str = schema_str.encode('utf-8') +tmp_schema_file_handle.write(schema_str) -self.log.info('Using schema for %s: %s', self.schema_filename, schema) +self.log.info('Using schema for %s: %s', self.schema_filename, schema_str) return {self.schema_filename: tmp_schema_file_handle} def _upload_to_gcs(self, files_to_upload): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4c89e440/tests/contrib/operators/test_mysql_to_gcs_operator.py -- diff --git a/tests/contrib/operators/test_mysql_to_gcs_operator.py b/tests/contrib/operators/test_mysql_to_gcs_operator.py new file mode 100644 index 000..c985ac3 --- /dev/null +++ b/tests/contrib/operators/test_mysql_to_gcs_operator.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +#
[2/2] incubator-airflow git commit: Merge pull request #3177 from whynick1/master
Merge pull request #3177 from whynick1/master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f865c789 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f865c789 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f865c789 Branch: refs/heads/master Commit: f865c78988eeab5cb989c2dc6eaa3e2be8aaac57 Parents: 9c0c426 4c89e44 Author: Joy GaoAuthored: Wed Apr 4 17:50:12 2018 -0700 Committer: Joy Gao Committed: Wed Apr 4 17:50:12 2018 -0700 -- airflow/contrib/operators/mysql_to_gcs.py | 26 ++- .../operators/test_mysql_to_gcs_operator.py | 47 2 files changed, 63 insertions(+), 10 deletions(-) --
[jira] [Created] (AIRFLOW-2283) Multi-Tenant security vulnerability
Garrett Summers created AIRFLOW-2283: Summary: Multi-Tenant security vulnerability Key: AIRFLOW-2283 URL: https://issues.apache.org/jira/browse/AIRFLOW-2283 Project: Apache Airflow Issue Type: Bug Components: models, scheduler, security, webserver Affects Versions: Airflow 1.8 Environment: Any/All Reporter: Garrett Summers We noticed what we think to be a potential security vulnerability when importing dag files in the following line: {{m = imp.load_source(mod_name, filepath)}} This line in the DagBag.process_file code imports the dag files available, but this causes all of the code in the file to actually execute (which could be any arbitrary code). If the dags for different tenants are being stored in a common dag structure (even though the are filtered for the different tenants) then the arbitrary code execution would make it possible for one tenant to access/modify the dags of other tenants. This would be a major problem for users who utilize the multi-tenant functionality in Airflow. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (AIRFLOW-2282) Fix grammar in UPDATING.md
[ https://issues.apache.org/jira/browse/AIRFLOW-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2282 started by Taylor Edmiston. > Fix grammar in UPDATING.md > -- > > Key: AIRFLOW-2282 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2282 > Project: Apache Airflow > Issue Type: Task >Reporter: Taylor Edmiston >Assignee: Taylor Edmiston >Priority: Trivial > Labels: documentation > > Fixes a small grammatical typo in UPDATING.md. Also auto removes some > trailing whitespace in another .md file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work stopped] (AIRFLOW-2282) Fix grammar in UPDATING.md
[ https://issues.apache.org/jira/browse/AIRFLOW-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2282 stopped by Taylor Edmiston. > Fix grammar in UPDATING.md > -- > > Key: AIRFLOW-2282 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2282 > Project: Apache Airflow > Issue Type: Task >Reporter: Taylor Edmiston >Assignee: Taylor Edmiston >Priority: Trivial > Labels: documentation > > Fixes a small grammatical typo in UPDATING.md. Also auto removes some > trailing whitespace in another .md file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2282) Fix grammar in UPDATING.md
[ https://issues.apache.org/jira/browse/AIRFLOW-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16426190#comment-16426190 ] Taylor Edmiston commented on AIRFLOW-2282: -- PR: https://github.com/apache/incubator-airflow/pull/3189 > Fix grammar in UPDATING.md > -- > > Key: AIRFLOW-2282 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2282 > Project: Apache Airflow > Issue Type: Task >Reporter: Taylor Edmiston >Assignee: Taylor Edmiston >Priority: Trivial > Labels: documentation > > Fixes a small grammatical typo in UPDATING.md. Also auto removes some > trailing whitespace in another .md file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (AIRFLOW-2282) Fix grammar in UPDATING.md
[ https://issues.apache.org/jira/browse/AIRFLOW-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2282 started by Taylor Edmiston. > Fix grammar in UPDATING.md > -- > > Key: AIRFLOW-2282 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2282 > Project: Apache Airflow > Issue Type: Task >Reporter: Taylor Edmiston >Assignee: Taylor Edmiston >Priority: Trivial > Labels: documentation > > Fixes a small grammatical typo in UPDATING.md. Also auto removes some > trailing whitespace in another .md file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2282) Fix grammar in UPDATING.md
[ https://issues.apache.org/jira/browse/AIRFLOW-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Taylor Edmiston updated AIRFLOW-2282: - Summary: Fix grammar in UPDATING.md (was: Fix grammar in UPDATING.md and remove trailing whitespace) > Fix grammar in UPDATING.md > -- > > Key: AIRFLOW-2282 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2282 > Project: Apache Airflow > Issue Type: Task >Reporter: Taylor Edmiston >Assignee: Taylor Edmiston >Priority: Trivial > Labels: documentation > > Fixes a small grammatical typo in UPDATING.md. Also auto removes some > trailing whitespace in another .md file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2282) Fix grammar in UPDATING.md and remove trailing whitespace
Taylor Edmiston created AIRFLOW-2282: Summary: Fix grammar in UPDATING.md and remove trailing whitespace Key: AIRFLOW-2282 URL: https://issues.apache.org/jira/browse/AIRFLOW-2282 Project: Apache Airflow Issue Type: Task Reporter: Taylor Edmiston Assignee: Taylor Edmiston Fixes a small grammatical typo in UPDATING.md. Also auto removes some trailing whitespace in another .md file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2281) Add support for Sendgrid categories
[ https://issues.apache.org/jira/browse/AIRFLOW-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marcin Szymanski updated AIRFLOW-2281: -- Summary: Add support for Sendgrid categories (was: Add support for Sendgrid category) > Add support for Sendgrid categories > --- > > Key: AIRFLOW-2281 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2281 > Project: Apache Airflow > Issue Type: New Feature > Components: contrib >Reporter: Marcin Szymanski >Assignee: Marcin Szymanski >Priority: Minor > > Allow adding categories to sendgrid mails > https://github.com/sendgrid/sendgrid-python/blob/master/sendgrid/helpers/mail/category.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2281) Add support for Sendgrid category
Marcin Szymanski created AIRFLOW-2281: - Summary: Add support for Sendgrid category Key: AIRFLOW-2281 URL: https://issues.apache.org/jira/browse/AIRFLOW-2281 Project: Apache Airflow Issue Type: New Feature Components: contrib Reporter: Marcin Szymanski Assignee: Marcin Szymanski Allow adding categories to sendgrid mails https://github.com/sendgrid/sendgrid-python/blob/master/sendgrid/helpers/mail/category.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1156) Using a timedelta object as a Schedule Interval with catchup=False causes the start_date to no longer be honored.
[ https://issues.apache.org/jira/browse/AIRFLOW-1156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425378#comment-16425378 ] Antti Ruokomäki commented on AIRFLOW-1156: -- Additionally, this seems to occur with a regular cron expression, "0 21 * * *" > Using a timedelta object as a Schedule Interval with catchup=False causes the > start_date to no longer be honored. > - > > Key: AIRFLOW-1156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1156 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: Airflow 1.8 >Reporter: Zachary Lawson >Priority: Minor > > Currently, in Airflow v1.8, if you set your schedule_interval to a timedelta > object and set catchup=False, the start_date is no longer honored and the DAG > is scheduled immediately upon unpausing the DAG. It is then schedule on the > schedule interval from that point onward. Example below: > {code} > from airflow import DAG > from datetime import datetime, timedelta > import logging > from airflow.operators.python_operator import PythonOperator > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2015, 6, 1), > } > dag = DAG('test', default_args=default_args, > schedule_interval=timedelta(seconds=5), catchup=False) > def context_test(ds, **context): > logging.info('testing') > test_context = PythonOperator( > task_id='test_context', > provide_context=True, > python_callable=context_test, > dag=dag > ) > {code} > If you switch the above over to a CRON expression, the behavior of the > scheduling is returned to the expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1156) Using a timedelta object as a Schedule Interval with catchup=False causes the start_date to no longer be honored.
[ https://issues.apache.org/jira/browse/AIRFLOW-1156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425366#comment-16425366 ] Antti Ruokomäki commented on AIRFLOW-1156: -- I've noticed similar behavior, when schedule is "@daily" > Using a timedelta object as a Schedule Interval with catchup=False causes the > start_date to no longer be honored. > - > > Key: AIRFLOW-1156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1156 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: Airflow 1.8 >Reporter: Zachary Lawson >Priority: Minor > > Currently, in Airflow v1.8, if you set your schedule_interval to a timedelta > object and set catchup=False, the start_date is no longer honored and the DAG > is scheduled immediately upon unpausing the DAG. It is then schedule on the > schedule interval from that point onward. Example below: > {code} > from airflow import DAG > from datetime import datetime, timedelta > import logging > from airflow.operators.python_operator import PythonOperator > default_args = { > 'owner': 'airflow', > 'depends_on_past': False, > 'start_date': datetime(2015, 6, 1), > } > dag = DAG('test', default_args=default_args, > schedule_interval=timedelta(seconds=5), catchup=False) > def context_test(ds, **context): > logging.info('testing') > test_context = PythonOperator( > task_id='test_context', > provide_context=True, > python_callable=context_test, > dag=dag > ) > {code} > If you switch the above over to a CRON expression, the behavior of the > scheduling is returned to the expected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (AIRFLOW-2278) Add possibility to edit DAGs in webapp
[ https://issues.apache.org/jira/browse/AIRFLOW-2278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425127#comment-16425127 ] Mykola Mykhalov edited comment on AIRFLOW-2278 at 4/4/18 7:30 AM: -- [~johnarnold] It uses natural way of DAGs refresh after editing, so it works with remote executors and separated services on different hosts. Just to be sure for 100%, I've tested it on cluster where we use Celery Executor and couple of servers. was (Author: miho): It uses natural way of DAGs refresh after editing, so it works with remote executors and separated services on different hosts. Just to be sure for 100%, I've tested it on cluster where we use Celery Executor and couple of servers. > Add possibility to edit DAGs in webapp > -- > > Key: AIRFLOW-2278 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2278 > Project: Apache Airflow > Issue Type: Improvement > Components: webapp >Reporter: Mykola Mykhalov >Assignee: Mykola Mykhalov >Priority: Minor > > When you need to make some minor changes in your DAG would be nice to have > possibility to edit it from webapp. > To protect DAG from editing by everyone should be set up arg inside DAG > 'editable_by'. Where '*' means everyone, and ['user1', 'user2'] means > specific users. > Example when DAG can be editable in webapp by user1 and user2 only: > args = { > 'owner': 'airflow', > 'editable_by': ['user1', 'user2'], > 'start_date': airflow.utils.dates.days_ago(2) > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2278) Add possibility to edit DAGs in webapp
[ https://issues.apache.org/jira/browse/AIRFLOW-2278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425127#comment-16425127 ] Mykola Mykhalov commented on AIRFLOW-2278: -- It uses natural way of DAGs refresh after editing, so it works with remote executors and separated services on different hosts. Just to be sure for 100%, I've tested it on cluster where we use Celery Executor and couple of servers. > Add possibility to edit DAGs in webapp > -- > > Key: AIRFLOW-2278 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2278 > Project: Apache Airflow > Issue Type: Improvement > Components: webapp >Reporter: Mykola Mykhalov >Assignee: Mykola Mykhalov >Priority: Minor > > When you need to make some minor changes in your DAG would be nice to have > possibility to edit it from webapp. > To protect DAG from editing by everyone should be set up arg inside DAG > 'editable_by'. Where '*' means everyone, and ['user1', 'user2'] means > specific users. > Example when DAG can be editable in webapp by user1 and user2 only: > args = { > 'owner': 'airflow', > 'editable_by': ['user1', 'user2'], > 'start_date': airflow.utils.dates.days_ago(2) > } -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2178) Scheduler can't get past SLA check if SMTP settings are incorrect
[ https://issues.apache.org/jira/browse/AIRFLOW-2178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2178. --- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3173 [https://github.com/apache/incubator-airflow/pull/3173] > Scheduler can't get past SLA check if SMTP settings are incorrect > - > > Key: AIRFLOW-2178 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2178 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.9.0 > Environment: 16.04 >Reporter: James Meickle >Assignee: David Klosowski >Priority: Major > Fix For: 2.0.0 > > Attachments: log.txt > > > After testing Airflow for a while in staging, I provisioned our prod cluster > and enabled the first DAG on it. The "backfill" for this DAG performed just > fine, so I assumed everything was working and left it over the weekend. > However, when the last "backfill" period completed and the scheduler > transitioned to the most recent execution date, it began failing in the > `manage_slas` method. Due to a configuration difference, SMTP was timing out > in production, preventing the SLA check from ever completing; this both > blocked SLA notifications, as well as prevented further tasks in this DAG > from ever getting scheduled. > As an operator, I would expect AIrflow to treat scheduling tasks as a > higher-priority concern, and to do so even f the SLA feature fails to work. I > would also expect Airflow to notify me in the web UI that email sending is > not currently working. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2178) Scheduler can't get past SLA check if SMTP settings are incorrect
[ https://issues.apache.org/jira/browse/AIRFLOW-2178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16425118#comment-16425118 ] ASF subversion and git services commented on AIRFLOW-2178: -- Commit 9c0c4264c3ecdee2d11c0be9d2a151ea423dd3d9 in incubator-airflow's branch refs/heads/master from [~d3cay] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=9c0c426 ] [AIRFLOW-2178] Add handling on SLA miss errors Closes #3173 from d3cay1/airflow2178-master > Scheduler can't get past SLA check if SMTP settings are incorrect > - > > Key: AIRFLOW-2178 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2178 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.9.0 > Environment: 16.04 >Reporter: James Meickle >Assignee: David Klosowski >Priority: Major > Attachments: log.txt > > > After testing Airflow for a while in staging, I provisioned our prod cluster > and enabled the first DAG on it. The "backfill" for this DAG performed just > fine, so I assumed everything was working and left it over the weekend. > However, when the last "backfill" period completed and the scheduler > transitioned to the most recent execution date, it began failing in the > `manage_slas` method. Due to a configuration difference, SMTP was timing out > in production, preventing the SLA check from ever completing; this both > blocked SLA notifications, as well as prevented further tasks in this DAG > from ever getting scheduled. > As an operator, I would expect AIrflow to treat scheduling tasks as a > higher-priority concern, and to do so even f the SLA feature fails to work. I > would also expect Airflow to notify me in the web UI that email sending is > not currently working. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2178] Add handling on SLA miss errors
Repository: incubator-airflow Updated Branches: refs/heads/master d1f94fe20 -> 9c0c4264c [AIRFLOW-2178] Add handling on SLA miss errors Closes #3173 from d3cay1/airflow2178-master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9c0c4264 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9c0c4264 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9c0c4264 Branch: refs/heads/master Commit: 9c0c4264c3ecdee2d11c0be9d2a151ea423dd3d9 Parents: d1f94fe Author: David KlosowskiAuthored: Wed Apr 4 09:19:59 2018 +0200 Committer: Fokko Driesprong Committed: Wed Apr 4 09:19:59 2018 +0200 -- airflow/jobs.py | 27 +- tests/jobs.py | 103 +-- 2 files changed, 109 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c0c4264/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 13fc2a2..6241717 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -24,7 +24,6 @@ import os import psutil import signal import six -import socket import sys import threading import time @@ -43,7 +42,6 @@ from time import sleep from airflow import configuration as conf from airflow import executors, models, settings from airflow.exceptions import AirflowException -from airflow.logging_config import configure_logging from airflow.models import DAG, DagRun from airflow.settings import Stats from airflow.task.task_runner import get_task_runner @@ -672,8 +670,13 @@ class SchedulerJob(BaseJob): if dag.sla_miss_callback: # Execute the alert callback self.log.info(' --> ABOUT TO CALL SLA MISS CALL BACK ') -dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, blocking_tis) -notification_sent = True +try: +dag.sla_miss_callback(dag, task_list, blocking_task_list, slas, + blocking_tis) +notification_sent = True +except Exception: +self.log.exception("Could not call sla_miss_callback for DAG %s", + dag.dag_id) email_content = """\ Here's a list of tasks that missed their SLAs: {task_list}\n @@ -691,12 +694,16 @@ class SchedulerJob(BaseJob): if email not in emails: emails.append(email) if emails and len(slas): -send_email( -emails, -"[airflow] SLA miss on DAG=" + dag.dag_id, -email_content) -email_sent = True -notification_sent = True +try: +send_email( +emails, +"[airflow] SLA miss on DAG=" + dag.dag_id, +email_content) +email_sent = True +notification_sent = True +except Exception: +self.log.exception("Could not send SLA Miss email notification for" + " DAG %s", dag.dag_id) # If we sent any notification, update the sla_miss table if notification_sent: for sla in slas: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9c0c4264/tests/jobs.py -- diff --git a/tests/jobs.py b/tests/jobs.py index ace593a..1e411e2 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -23,7 +23,6 @@ import multiprocessing import os import shutil import six -import socket import threading import time import unittest @@ -46,7 +45,7 @@ from airflow.utils.timeout import timeout from airflow.utils.dag_processing import SimpleDag, SimpleDagBag, list_py_file_paths from airflow.utils.net import get_hostname -from mock import Mock, patch +from mock import Mock, patch, MagicMock, PropertyMock from sqlalchemy.orm.session import make_transient from tests.executors.test_executor import TestExecutor @@ -95,7 +94,7 @@ class BackfillJobTest(unittest.TestCase): target_dag.clear() scheduler = SchedulerJob() -queue = mock.Mock() +queue = Mock() scheduler._process_task_instances(target_dag, queue=queue) self.assertFalse(queue.append.called) @@ -108,7 +107,7 @@ class BackfillJobTest(unittest.TestCase): job.run() scheduler = SchedulerJob() -queue =