[jira] [Commented] (AIRFLOW-78) airflow clear leaves dag_runs
[ https://issues.apache.org/jira/browse/AIRFLOW-78?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15282358#comment-15282358 ] Siddharth Anand commented on AIRFLOW-78: [~abridgett] Can you tell me more about your expectations of clear? > airflow clear leaves dag_runs > - > > Key: AIRFLOW-78 > URL: https://issues.apache.org/jira/browse/AIRFLOW-78 > Project: Apache Airflow > Issue Type: Wish > Components: cli >Affects Versions: Airflow 1.6.2 >Reporter: Adrian Bridgett >Assignee: Siddharth Anand >Priority: Minor > > (moved from https://github.com/apache/incubator-airflow/issues/829) > "airflow clear -c -d -s 2016-01-03 dagid" doesn't clear the dagrun, it sets > it to running instead (apparently since this is often used to re-run jobs). > However this then breaks max_active_runs=1 (I have to stop the scheduler, > then airflow clear, psql to delete the dagrun, then start the scheduler). > This problem was probably seen on an Airflow 1.6.x install. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand closed AIRFLOW-106. --- > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git >Reporter: dud >Assignee: Siddharth Anand >Priority: Blocker > Attachments: If the failed task is not the first it works.png, > screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > > Starting attempt 1 of 2 > > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing >on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[1/2] incubator-airflow git commit: Fix : Don't treat premature tasks as could_not_run tasks
Repository: incubator-airflow Updated Branches: refs/heads/master 31f01b838 -> 10d70d9d7 Fix : Don't treat premature tasks as could_not_run tasks Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ab5d4459 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ab5d4459 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ab5d4459 Branch: refs/heads/master Commit: ab5d445992617585a0ced1d81881a0728f49b13a Parents: dddfd3b Author: Siddharth AnandAuthored: Thu May 12 03:37:51 2016 + Committer: Siddharth Anand Committed: Fri May 13 01:39:39 2016 + -- airflow/jobs.py | 3 +++ airflow/models.py | 11 ++- 2 files changed, 13 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab5d4459/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index b21a196..7244d84 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -535,7 +535,10 @@ class SchedulerJob(BaseJob): elif ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Queuing task: {}'.format(ti)) queue.put((ti.key, pickle_id)) +elif ti.is_premature(): +continue else: +self.logger.debug('Adding task: {} to the COULD_NOT_RUN set'.format(ti)) could_not_run.add(ti) # this type of deadlock happens when dagruns can't even start and so http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ab5d4459/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 7618bd5..0f664a0 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -898,7 +898,7 @@ class TaskInstance(Base): if self.execution_date > datetime.now(): return False # is the task still in the retry waiting period? -elif self.state == State.UP_FOR_RETRY and not self.ready_for_retry(): +elif self.is_premature(): return False # does the task have an end_date prior to the execution date? elif self.task.end_date and self.execution_date > self.task.end_date: @@ -920,6 +920,15 @@ class TaskInstance(Base): else: return False + +def is_premature(self): +""" +Returns whether a task is in UP_FOR_RETRY state and its retry interval +has elapsed. +""" +# is the task still in the retry waiting period? +return self.state == State.UP_FOR_RETRY and not self.ready_for_retry() + def is_runnable( self, include_queued=False,
[jira] [Updated] (AIRFLOW-110) Point people to the approriate process to submit PRs in the repository's CONTRIBUTING.md
[ https://issues.apache.org/jira/browse/AIRFLOW-110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini updated AIRFLOW-110: Labels: documentation newbie (was: documentation) > Point people to the approriate process to submit PRs in the repository's > CONTRIBUTING.md > > > Key: AIRFLOW-110 > URL: https://issues.apache.org/jira/browse/AIRFLOW-110 > Project: Apache Airflow > Issue Type: Task > Components: docs >Reporter: Arthur Wiedmer >Priority: Trivial > Labels: documentation, newbie > > The current process to contribute code could be made more accessible. I am > assuming that the entry point to the project is Github and the repository. We > could modify the contributing.md as well as the read me to point to the > proper way to do this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-110) Point people to the approriate process to submit PRs in the repository's CONTRIBUTING.md
Arthur Wiedmer created AIRFLOW-110: -- Summary: Point people to the approriate process to submit PRs in the repository's CONTRIBUTING.md Key: AIRFLOW-110 URL: https://issues.apache.org/jira/browse/AIRFLOW-110 Project: Apache Airflow Issue Type: Task Components: docs Reporter: Arthur Wiedmer Priority: Trivial The current process to contribute code could be made more accessible. I am assuming that the entry point to the project is Github and the repository. We could modify the contributing.md as well as the read me to point to the proper way to do this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (AIRFLOW-109) PrestoHook get_pandas_df executes a method that can raise outside of the try catch statement.
Arthur Wiedmer created AIRFLOW-109: -- Summary: PrestoHook get_pandas_df executes a method that can raise outside of the try catch statement. Key: AIRFLOW-109 URL: https://issues.apache.org/jira/browse/AIRFLOW-109 Project: Apache Airflow Issue Type: Bug Components: hooks Affects Versions: Airflow 1.8, Airflow 1.7.1, Airflow 1.6.2 Reporter: Arthur Wiedmer Assignee: Arthur Wiedmer Priority: Minor This issue occurs when a malformed SQL statement is passed to the get_pandas_df method of the presto hook. Pyhive raises a DatabaseError outside of the try catch, leading in the wrong kind of error being raised. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-96) s3_conn_id of s3KeySensor cannot be defined using an environment variable
[ https://issues.apache.org/jira/browse/AIRFLOW-96?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini updated AIRFLOW-96: --- Assignee: dmtran > s3_conn_id of s3KeySensor cannot be defined using an environment variable > - > > Key: AIRFLOW-96 > URL: https://issues.apache.org/jira/browse/AIRFLOW-96 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.6.2 > Environment: Python Version: 2.7.11 > Operating System: OS X El Capitan 10.11.4 >Reporter: dmtran >Assignee: dmtran >Priority: Minor > > According to https://pythonhosted.org/airflow/concepts.html#connections, > Airflow has the ability to reference connections via environment variables > from the operating system. The environment variable needs to be prefixed with > AIRFLOW_CONN_ to be considered a connection. > This doesn't work with an S3KeySensor, the following exception is raised: > {noformat} > [2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in > the repository > Traceback (most recent call last): > File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000, > in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", > line 65, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line > 24, in check_key_in_s3 > s3_conn_id='S3_CONNECTION') > File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, > in wrapper > result = func(*args, **kwargs) > File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", > line 332, in __init__ > raise AirflowException("conn_id doesn't exist in the repository") > AirflowException: conn_id doesn't exist in the repository > {noformat} > You can reproduce this issue by triggering a DAGrun of the following DAG: > {code} > from airflow.hooks.base_hook import CONN_ENV_PREFIX > from airflow.operators import * > from airflow.models import DAG > from datetime import datetime > import os > args = { > 'owner': 'airflow', > 'start_date': datetime(2016, 5, 10, 7) > } > dag = DAG(dag_id='test-s3', > default_args=args, > schedule_interval=None) > def check_key_in_s3(**context): > os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = '{ "aws_access_key_id": > "dummyAccessKey", "aws_secret_access_key": "dummySecretKey" }' > sensor = S3KeySensor( > task_id='s3keysensor', > bucket_name='dummy_bucket', > bucket_key='dummy_key', > dag=dag, > s3_conn_id='S3_CONNECTION') > sensor.execute(context) > check_s3_key_operator = PythonOperator( > task_id='check_key_in_s3', > python_callable=check_key_in_s3, > provide_context=True, > dag=dag) > {code} > The exception is raised because of the following lines in method __init__ of > class S3KeySensor: > {code} > db = session.query(DB).filter(DB.conn_id == s3_conn_id).first() > if not db: > raise AirflowException("conn_id doesn't exist in the repository") > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-96) s3_conn_id of s3KeySensor cannot be defined using an environment variable
[ https://issues.apache.org/jira/browse/AIRFLOW-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15282026#comment-15282026 ] dmtran commented on AIRFLOW-96: --- I guess you mean the S3PrefixSensor also needs to be updated. Sure, I'll give this a try. > s3_conn_id of s3KeySensor cannot be defined using an environment variable > - > > Key: AIRFLOW-96 > URL: https://issues.apache.org/jira/browse/AIRFLOW-96 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.6.2 > Environment: Python Version: 2.7.11 > Operating System: OS X El Capitan 10.11.4 >Reporter: dmtran >Priority: Minor > > According to https://pythonhosted.org/airflow/concepts.html#connections, > Airflow has the ability to reference connections via environment variables > from the operating system. The environment variable needs to be prefixed with > AIRFLOW_CONN_ to be considered a connection. > This doesn't work with an S3KeySensor, the following exception is raised: > {noformat} > [2016-05-10 17:01:37,101] {models.py:1041} ERROR - conn_id doesn't exist in > the repository > Traceback (most recent call last): > File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1000, > in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", > line 65, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/Users/dmtran/git_repos/coordinator/airflow/dags/test-s3.py", line > 24, in check_key_in_s3 > s3_conn_id='S3_CONNECTION') > File "/usr/local/lib/python2.7/site-packages/airflow/utils.py", line 461, > in wrapper > result = func(*args, **kwargs) > File "/usr/local/lib/python2.7/site-packages/airflow/operators/sensors.py", > line 332, in __init__ > raise AirflowException("conn_id doesn't exist in the repository") > AirflowException: conn_id doesn't exist in the repository > {noformat} > You can reproduce this issue by triggering a DAGrun of the following DAG: > {code} > from airflow.hooks.base_hook import CONN_ENV_PREFIX > from airflow.operators import * > from airflow.models import DAG > from datetime import datetime > import os > args = { > 'owner': 'airflow', > 'start_date': datetime(2016, 5, 10, 7) > } > dag = DAG(dag_id='test-s3', > default_args=args, > schedule_interval=None) > def check_key_in_s3(**context): > os.environ[CONN_ENV_PREFIX + 'S3_CONNECTION'] = '{ "aws_access_key_id": > "dummyAccessKey", "aws_secret_access_key": "dummySecretKey" }' > sensor = S3KeySensor( > task_id='s3keysensor', > bucket_name='dummy_bucket', > bucket_key='dummy_key', > dag=dag, > s3_conn_id='S3_CONNECTION') > sensor.execute(context) > check_s3_key_operator = PythonOperator( > task_id='check_key_in_s3', > python_callable=check_key_in_s3, > provide_context=True, > dag=dag) > {code} > The exception is raised because of the following lines in method __init__ of > class S3KeySensor: > {code} > db = session.query(DB).filter(DB.conn_id == s3_conn_id).first() > if not db: > raise AirflowException("conn_id doesn't exist in the repository") > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-91) Ssl gunicorn
[ https://issues.apache.org/jira/browse/AIRFLOW-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281817#comment-15281817 ] Chris Riccomini commented on AIRFLOW-91: (and reverted due to useless config in the PR, according to [~maxime.beauche...@apache.org]) > Ssl gunicorn > > > Key: AIRFLOW-91 > URL: https://issues.apache.org/jira/browse/AIRFLOW-91 > Project: Apache Airflow > Issue Type: Improvement > Components: security >Reporter: Stanilovsky Evgeny >Assignee: Stanilovsky Evgeny > > old issue : https://github.com/apache/incubator-airflow/pull/1492 > Ssl gunicorn support -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-91) Ssl gunicorn
[ https://issues.apache.org/jira/browse/AIRFLOW-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281810#comment-15281810 ] Chris Riccomini commented on AIRFLOW-91: https://github.com/apache/incubator-airflow/pull/1491 https://github.com/apache/incubator-airflow/pull/1492 > Ssl gunicorn > > > Key: AIRFLOW-91 > URL: https://issues.apache.org/jira/browse/AIRFLOW-91 > Project: Apache Airflow > Issue Type: Improvement > Components: security >Reporter: Stanilovsky Evgeny >Assignee: Stanilovsky Evgeny > > old issue : https://github.com/apache/incubator-airflow/pull/1492 > Ssl gunicorn support -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-91) Ssl gunicorn
[ https://issues.apache.org/jira/browse/AIRFLOW-91?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281809#comment-15281809 ] Chris Riccomini commented on AIRFLOW-91: Reverted. Apparently there are multiple PRs for this. My bad. > Ssl gunicorn > > > Key: AIRFLOW-91 > URL: https://issues.apache.org/jira/browse/AIRFLOW-91 > Project: Apache Airflow > Issue Type: Improvement > Components: security >Reporter: Stanilovsky Evgeny >Assignee: Stanilovsky Evgeny > > old issue : https://github.com/apache/incubator-airflow/pull/1492 > Ssl gunicorn support -- This message was sent by Atlassian JIRA (v6.3.4#6332)
incubator-airflow git commit: ssl gunicorn support
Repository: incubator-airflow Updated Branches: refs/heads/master dddfd3b5b -> e332f6362 ssl gunicorn support Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e332f636 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e332f636 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e332f636 Branch: refs/heads/master Commit: e332f63620a5f85e38c4a1a5ac9c9a4a5bfc6035 Parents: dddfd3b Author: Stanilovsky EvgenyAuthored: Thu May 12 09:58:40 2016 +0300 Committer: Stanilovsky Evgeny Committed: Thu May 12 10:07:45 2016 +0300 -- airflow/bin/cli.py | 21 + airflow/configuration.py | 8 2 files changed, 25 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e332f636/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index d735bad..f2dceee 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -410,10 +410,17 @@ def webserver(args): app.run(debug=True, port=args.port, host=args.hostname) else: pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid) +secure_params = args.ssl_certfile and args.ssl_keyfile +if secure_params: +sec_params = ['--certfile=' + args.ssl_certfile, '--keyfile=' + +args.ssl_keyfile] +else: +sec_params = [] print( 'Running the Gunicorn server with {workers} {args.workerclass}' 'workers on host {args.hostname} and port ' -'{args.port} with a timeout of {worker_timeout}...'.format(**locals())) +'{args.port} with a timeout of {worker_timeout},' +' secure={secure_params}'.format(**locals())) run_args = ['gunicorn', '-w ' + str(args.workers), @@ -421,7 +428,7 @@ def webserver(args): '-t ' + str(args.worker_timeout), '-b ' + args.hostname + ':' + str(args.port), '-n ' + 'airflow-webserver', -'-p ' + str(pid)] +'-p ' + str(pid)] + sec_params if args.daemon: run_args.append("-D") @@ -776,6 +783,12 @@ class CLIFactory(object): ("-hn", "--hostname"), default=conf.get('webserver', 'WEB_SERVER_HOST'), help="Set the hostname on which to run the web server"), + 'ssl_certfile': Arg( + ("-scf", "--ssl_certfile"), + help="ssl certificate file"), + 'ssl_keyfile': Arg( + ("-skf", "--ssl_keyfile"), + help="ssl key file"), 'debug': Arg( ("-d", "--debug"), "Use the server that ships with Flask in debug mode", @@ -902,8 +915,8 @@ class CLIFactory(object): 'func': webserver, 'help': "Start a Airflow webserver instance", 'args': ('port', 'workers', 'workerclass', 'worker_timeout', 'hostname', - 'pid', 'daemon', 'stdout', 'stderr', 'log_file', - 'debug'), + 'pid', 'daemon', 'stdout', 'stderr', 'log_file','ssl_certfile', + 'ssl_keyfile', 'debug'), }, { 'func': resetdb, 'help': "Burn down and rebuild the metadata database", http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e332f636/airflow/configuration.py -- diff --git a/airflow/configuration.py b/airflow/configuration.py index 173eddb..6478891 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -109,6 +109,8 @@ defaults = { 'expose_config': False, 'workers': 4, 'worker_class': 'sync', +'ssl_certfile': None, +'ssl_keyfile': None, }, 'scheduler': { 'statsd_on': False, @@ -263,6 +265,12 @@ authenticate = False # Filter the list of dags by owner name (requires authentication to be enabled) filter_by_owner = False +# SSL certificate file +# ssl_certfile = + +# SSL key file +# ssl_keyfile = + [email] email_backend = airflow.utils.email.send_email_smtp
[jira] [Commented] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281723#comment-15281723 ] Siddharth Anand commented on AIRFLOW-106: - If the failed task is not the first in the dagrun, the behavior is correct before and after my fix. As above, the first 4 dag runs are before the fix and the last dag run is after my fix. !If the failed task is not the first it works.png! > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git >Reporter: dud >Assignee: Siddharth Anand >Priority: Blocker > Attachments: If the failed task is not the first it works.png, > screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > > Starting attempt 1 of 2 > > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing >on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand updated AIRFLOW-106: Attachment: If the failed task is not the first it works.png > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git >Reporter: dud >Assignee: Siddharth Anand >Priority: Blocker > Attachments: If the failed task is not the first it works.png, > screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > > Starting attempt 1 of 2 > > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing >on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand updated AIRFLOW-106: Attachment: (was: bug_works_if_not_first_task_that_fails.png) > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git >Reporter: dud >Assignee: Siddharth Anand >Priority: Blocker > Attachments: screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > > Starting attempt 1 of 2 > > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing >on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand updated AIRFLOW-106: Comment: was deleted (was: [~sanand], on your last screenshot, why is the DAG run still showing as running even though the sleep1 task is failed?) > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git >Reporter: dud >Assignee: Siddharth Anand >Priority: Blocker > Attachments: screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > > Starting attempt 1 of 2 > > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing >on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-108) Add data retention policy to Airflow
[ https://issues.apache.org/jira/browse/AIRFLOW-108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281634#comment-15281634 ] Chris Riccomini commented on AIRFLOW-108: - I haven't fully thought through how this would be implemented, but we have 100s of DAGs that run every 15m. The DB is starting to get larger and larger. The DB size isn't a problem now, but I'm worried that it might be shortly. > Add data retention policy to Airflow > > > Key: AIRFLOW-108 > URL: https://issues.apache.org/jira/browse/AIRFLOW-108 > Project: Apache Airflow > Issue Type: Wish > Components: db, scheduler >Reporter: Chris Riccomini > > Airflow's DB currently holds the entire history of all executions for all > time. This is problematic as the DB grows. The UI starts to get slower, and > the DB's disk usage grows. There is no bound to how large the DB will grow. > It would be useful to add a feature in Airflow to do two things: > # Delete old data from the DB > # Mark some lower watermark, past which DAG executions are ignored > For example, (2) would allow you to tell the scheduler "ignore all data prior > to a year ago". And (1) would allow Airflow to delete all data prior to > January 1, 2015. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-108) Add data retention policy to Airflow
[ https://issues.apache.org/jira/browse/AIRFLOW-108?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini updated AIRFLOW-108: Issue Type: Wish (was: New Feature) Summary: Add data retention policy to Airflow (was: Add data retention to Airflow) > Add data retention policy to Airflow > > > Key: AIRFLOW-108 > URL: https://issues.apache.org/jira/browse/AIRFLOW-108 > Project: Apache Airflow > Issue Type: Wish > Components: db, scheduler >Reporter: Chris Riccomini > > Airflow's DB currently holds the entire history of all executions for all > time. This is problematic as the DB grows. The UI starts to get slower, and > the DB's disk usage grows. There is no bound to how large the DB will grow. > It would be useful to add a feature in Airflow to do two things: > # Delete old data from the DB > # Mark some lower watermark, past which DAG executions are ignored > For example, (2) would allow you to tell the scheduler "ignore all data prior > to a year ago". And (1) would allow Airflow to delete all data prior to > January 1, 2015. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281615#comment-15281615 ] Chris Riccomini commented on AIRFLOW-106: - [~sanand], on your last screenshot, why is the DAG run still showing as running even though the sleep1 task is failed? > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git >Reporter: dud >Assignee: Siddharth Anand >Priority: Blocker > Attachments: bug_works_if_not_first_task_that_fails.png, > screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > > Starting attempt 1 of 2 > > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing >on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281583#comment-15281583 ] Siddharth Anand commented on AIRFLOW-106: - This shows the bug behavior if the task to fail is not the first one. !bug_works_if_not_first_task_that_fails.png! > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git >Reporter: dud >Assignee: Siddharth Anand >Priority: Blocker > Attachments: bug_works_if_not_first_task_that_fails.png, > screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > > Starting attempt 1 of 2 > > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing >on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-106) Task Retries, on_failure callback, and email_on_failure Not Honored if First Task in a DAG Fails
[ https://issues.apache.org/jira/browse/AIRFLOW-106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand updated AIRFLOW-106: Attachment: bug_works_if_not_first_task_that_fails.png This shows that if the task that fails is not the first one, then the anti-deadlock code does not FAIL the DagRun and leave tasks in incomplete states. > Task Retries, on_failure callback, and email_on_failure Not Honored if First > Task in a DAG Fails > > > Key: AIRFLOW-106 > URL: https://issues.apache.org/jira/browse/AIRFLOW-106 > Project: Apache Airflow > Issue Type: Bug > Environment: Latest version from Git >Reporter: dud >Assignee: Siddharth Anand >Priority: Blocker > Attachments: bug_works_if_not_first_task_that_fails.png, > screenshot-1.png > > > Hello. > I created the following workflow : > {code} > from airflow import DAG > from airflow.operators import PythonOperator > from datetime import datetime, timedelta > from airflow.models import Variable > from time import sleep > default_args = { > 'depends_on_past': False, > 'start_date': datetime(2016, 5, 11, 15, 20), > 'email': > 'email_on_failure': True, > 'email_on_retry': False, > 'retries': 1, > 'retry_delay': timedelta(minutes=2), > 'end_date': datetime(2016, 5, 11, 16, 00), > } > PARENT_DAG_NAME = 'test' > dag = DAG(PARENT_DAG_NAME, default_args=default_args, > schedule_interval=timedelta(minutes=10)) > def sleep1_function(**kwargs): > sleep(90) > return Variable.get('test_var') > sleep1 = PythonOperator( > task_id='sleep1', > python_callable=sleep1_function, > dag=dag) > {code} > I forgot to declare test_var so when this DAG launched it failed quickly. > However no failure email was ever sent. Clearing the failed task to make it > rerun doesn't trigger any email. > Here is the logs : > {code} > [2016-05-11 15:53:31,784] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,272] {models.py:157} INFO - Filling up the DagBag from > /var/lib/airflow/airflow/dags/test.py > [2016-05-11 15:53:32,313] {models.py:1216} INFO - > > Starting attempt 1 of 2 > > [2016-05-11 15:53:32,333] {models.py:1239} INFO - Executing >on 2016-05-11 15:20:00 > [2016-05-11 15:55:03,450] {models.py:1306} ERROR - Variable test_var does not > exist > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 1265, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/operators/python_operator.py", > line 66, in execute > return_value = self.python_callable(*self.op_args, **self.op_kwargs) > File "/var/lib/airflow/airflow/dags/test.py", line 31, in sleep1_function > return Variable.get('test_var') > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/utils/db.py", > line 53, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.0-py2.7.egg/airflow/models.py", > line 3145, in get > raise ValueError('Variable {} does not exist'.format(key)) > ValueError: Variable test_var does not exist > [2016-05-11 15:55:03,581] {models.py:1318} INFO - Marking task as UP_FOR_RETRY > [2016-05-11 15:55:03,759] {models.py:1347} ERROR - Variable test_var does not > exist > {code} > In the DAG Runs page, the workflow is set as failed. In hte taks instance > page, it is set as up_for_retry but no new run is ever scheduled. > I tried incrementing the retires parameter, but nothing different happens, > Airflow never retries after the first run. > dud -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (AIRFLOW-91) Ssl gunicorn
[ https://issues.apache.org/jira/browse/AIRFLOW-91?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanilovsky Evgeny updated AIRFLOW-91: -- Description: old issue : https://github.com/apache/incubator-airflow/pull/1492 Ssl gunicorn support was: old issue : https://github.com/apache/incubator-airflow/pull/1439 Ssl gunicorn support > Ssl gunicorn > > > Key: AIRFLOW-91 > URL: https://issues.apache.org/jira/browse/AIRFLOW-91 > Project: Apache Airflow > Issue Type: Improvement > Components: security >Reporter: Stanilovsky Evgeny > > old issue : https://github.com/apache/incubator-airflow/pull/1492 > Ssl gunicorn support -- This message was sent by Atlassian JIRA (v6.3.4#6332)