[jira] [Created] (AIRFLOW-962) Dag runs are deadlocked for DAG scheduled for @once
Sheikh Aslam Ahmed created AIRFLOW-962: -- Summary: Dag runs are deadlocked for DAG scheduled for @once Key: AIRFLOW-962 URL: https://issues.apache.org/jira/browse/AIRFLOW-962 Project: Apache Airflow Issue Type: Bug Components: DagRun Environment: airflow v1.7.1.3 installed AWS cluster node with celery executor ,rabbitMQ service and mysql for meta store Reporter: Sheikh Aslam Ahmed We read mysql table row from python script and create dags out of it, any change in the row leads to change in the dag. we schedule dag based on inputs(like @once,hourly, daily etc). >From last few days hourly and other scheduled profiles run properly but >profile with schedule @once got failed (on web UI no task got picked up from >the DAG). the error in the logs shows message as ' {jobs.py:538} ERROR - Dag >runs are deadlocked for DAG: dagname' after that if we trigger a manual run for same DAG from web UI it run and complete successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-82) Add configuration option in airflow.cfg for Flask APPLICATION_ROOT
[ https://issues.apache.org/jira/browse/AIRFLOW-82?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903978#comment-15903978 ] Ivan Levchenko commented on AIRFLOW-82: --- This would be a big deal for me. Currently doing html rewriting in apache to get to this to work. > Add configuration option in airflow.cfg for Flask APPLICATION_ROOT > -- > > Key: AIRFLOW-82 > URL: https://issues.apache.org/jira/browse/AIRFLOW-82 > Project: Apache Airflow > Issue Type: Wish >Reporter: Sean McIntyre >Priority: Minor > Labels: feature, suggestions > > For various reasons, I might not have the flexibility to use a domain or > subdomain root, e.g., airflow.myorg.com) as the entry point to Airflow. > Instead I may have to use something like myorg.com/airflow/. > Fortunately Flask has the APPLICATION_ROOT configuration option which makes > this easy if URLs are rendered with the Flask url_for method. > I believe this feature is added in two steps: > 1. Add configuration option logic to flow from airflow.cfg to instantiation > of Flask. > 2. Go through existing template files and convert to using Flask url_for > methods. > 3. (hidden step) Encourage other contributors to use url_for going forward. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[2/2] incubator-airflow git commit: Merge pull request #2137 from gwax/editorconfig
Merge pull request #2137 from gwax/editorconfig Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2a613620 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2a613620 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2a613620 Branch: refs/heads/master Commit: 2a6136202d3ab4a92dda892d100c2820c0b45702 Parents: 8e2003e f5cacca Author: Arthur WiedmerAuthored: Thu Mar 9 11:53:57 2017 -0800 Committer: Arthur Wiedmer Committed: Thu Mar 9 11:53:57 2017 -0800 -- .editorconfig | 32 1 file changed, 32 insertions(+) --
[jira] [Created] (AIRFLOW-961) LocalTaskJob onkill should get run on TERM
Alex Guziel created AIRFLOW-961: --- Summary: LocalTaskJob onkill should get run on TERM Key: AIRFLOW-961 URL: https://issues.apache.org/jira/browse/AIRFLOW-961 Project: Apache Airflow Issue Type: Bug Reporter: Alex Guziel Assignee: Alex Guziel Right now, the on_kill happens in the finally block, when it should also be handled in a SIGTERM -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-960) Add support for .editorconfig
[ https://issues.apache.org/jira/browse/AIRFLOW-960?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arthur Wiedmer resolved AIRFLOW-960. Resolution: Fixed Fix Version/s: 1.9.0 Issue resolved by pull request #2137 [https://github.com/apache/incubator-airflow/pull/2137] > Add support for .editorconfig > - > > Key: AIRFLOW-960 > URL: https://issues.apache.org/jira/browse/AIRFLOW-960 > Project: Apache Airflow > Issue Type: Improvement >Reporter: George Leslie-Waksman >Assignee: George Leslie-Waksman >Priority: Trivial > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-960) Add support for .editorconfig
[ https://issues.apache.org/jira/browse/AIRFLOW-960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903743#comment-15903743 ] ASF subversion and git services commented on AIRFLOW-960: - Commit f5caccacd01bfc51d2b1d268886e5ab0b29497be in incubator-airflow's branch refs/heads/master from [~gwax] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=f5cacca ] AIRFLOW-960 Add .editorconfig file > Add support for .editorconfig > - > > Key: AIRFLOW-960 > URL: https://issues.apache.org/jira/browse/AIRFLOW-960 > Project: Apache Airflow > Issue Type: Improvement >Reporter: George Leslie-Waksman >Assignee: George Leslie-Waksman >Priority: Trivial > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[1/2] incubator-airflow git commit: AIRFLOW-960 Add .editorconfig file
Repository: incubator-airflow Updated Branches: refs/heads/master 8e2003e37 -> 2a6136202 AIRFLOW-960 Add .editorconfig file Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f5caccac Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f5caccac Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f5caccac Branch: refs/heads/master Commit: f5caccacd01bfc51d2b1d268886e5ab0b29497be Parents: e423981 Author: George Leslie-WaksmanAuthored: Thu Mar 9 11:23:06 2017 -0800 Committer: George Leslie-Waksman Committed: Thu Mar 9 11:36:41 2017 -0800 -- .editorconfig | 32 1 file changed, 32 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f5caccac/.editorconfig -- diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000..a80bd65 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,32 @@ +root = true + +[*] +end_of_line = lf +indent_style = space +insert_final_newline = true +trim_trailing_whitespace = true +charset = utf-8 + +[*.py] +indent_size = 4 + +[*.sh] +indent_size = 4 + +[*.sql] +indent_size = 4 + +[*.js] +indent_size = 2 + +[*.css] +indent_size = 2 + +[*.{md,rst}] +indent_size = 2 + +[*.{yml,yaml}] +indent_size = 2 + +[*.{htm,html}] +indent_size = 2
[jira] [Resolved] (AIRFLOW-959) .gitignore file is disorganized and incomplete
[ https://issues.apache.org/jira/browse/AIRFLOW-959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arthur Wiedmer resolved AIRFLOW-959. Resolution: Fixed Fix Version/s: 1.9.0 Issue resolved by pull request #2136 [https://github.com/apache/incubator-airflow/pull/2136] > .gitignore file is disorganized and incomplete > -- > > Key: AIRFLOW-959 > URL: https://issues.apache.org/jira/browse/AIRFLOW-959 > Project: Apache Airflow > Issue Type: Bug >Reporter: George Leslie-Waksman >Assignee: George Leslie-Waksman >Priority: Trivial > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-959) .gitignore file is disorganized and incomplete
[ https://issues.apache.org/jira/browse/AIRFLOW-959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903738#comment-15903738 ] ASF subversion and git services commented on AIRFLOW-959: - Commit 3d3c1485a7cc480f58609f97e287a1e263638dbe in incubator-airflow's branch refs/heads/master from [~gwax] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=3d3c148 ] AIRFLOW-959 Cleanup and reorganize .gitignore > .gitignore file is disorganized and incomplete > -- > > Key: AIRFLOW-959 > URL: https://issues.apache.org/jira/browse/AIRFLOW-959 > Project: Apache Airflow > Issue Type: Bug >Reporter: George Leslie-Waksman >Assignee: George Leslie-Waksman >Priority: Trivial > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[1/2] incubator-airflow git commit: AIRFLOW-959 Cleanup and reorganize .gitignore
Repository: incubator-airflow Updated Branches: refs/heads/master e42398100 -> 8e2003e37 AIRFLOW-959 Cleanup and reorganize .gitignore Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3d3c1485 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3d3c1485 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3d3c1485 Branch: refs/heads/master Commit: 3d3c1485a7cc480f58609f97e287a1e263638dbe Parents: e423981 Author: George Leslie-WaksmanAuthored: Thu Mar 9 10:49:02 2017 -0800 Committer: George Leslie-Waksman Committed: Thu Mar 9 11:37:49 2017 -0800 -- .gitignore | 144 ++-- 1 file changed, 119 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3d3c1485/.gitignore -- diff --git a/.gitignore b/.gitignore index 694a561..f0e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,32 +1,126 @@ -.idea/* -*.bkp -*.egg-info -*.pyc -.DS_Store -.ipynb* -.coverage -.python-version -airflow/git_version -airflow/www/static/coverage/ -airflow.db +# Airflow configuration airflow.cfg +unittests.cfg airflow_login.py -build -cover dbinit.py -docs/_* -dist -env initdb.py -logs -MANIFEST secrets.py -sftp-config.json -unittests.cfg -error.log + +# Airflow sqlite databases +airflow.db unittests.db -rat-results.txt -/.eggs/ -/.tox/ -venv + +# Airflow temporary artifacts +airflow/git_version +airflow/www/static/coverage/ +logs/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + +# PyCharm +.idea/ + +# vim *.swp + +# OSX +.DS_Store + +# SQL Server backups +*.bkp + +# Spark +rat-results.txt
[2/2] incubator-airflow git commit: Merge pull request #2136 from gwax/update-gitignore
Merge pull request #2136 from gwax/update-gitignore Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8e2003e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8e2003e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8e2003e3 Branch: refs/heads/master Commit: 8e2003e37a105be214266bb24d1104c32cc72816 Parents: e423981 3d3c148 Author: Arthur WiedmerAuthored: Thu Mar 9 11:52:10 2017 -0800 Committer: Arthur Wiedmer Committed: Thu Mar 9 11:52:10 2017 -0800 -- .gitignore | 144 ++-- 1 file changed, 119 insertions(+), 25 deletions(-) --
[jira] [Commented] (AIRFLOW-959) .gitignore file is disorganized and incomplete
[ https://issues.apache.org/jira/browse/AIRFLOW-959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903695#comment-15903695 ] Arthur Wiedmer commented on AIRFLOW-959: +1 > .gitignore file is disorganized and incomplete > -- > > Key: AIRFLOW-959 > URL: https://issues.apache.org/jira/browse/AIRFLOW-959 > Project: Apache Airflow > Issue Type: Bug >Reporter: George Leslie-Waksman >Assignee: George Leslie-Waksman >Priority: Trivial > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-959) .gitignore file is disorganized and incomplete
George Leslie-Waksman created AIRFLOW-959: - Summary: .gitignore file is disorganized and incomplete Key: AIRFLOW-959 URL: https://issues.apache.org/jira/browse/AIRFLOW-959 Project: Apache Airflow Issue Type: Improvement Reporter: George Leslie-Waksman Assignee: George Leslie-Waksman Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-959) .gitignore file is disorganized and incomplete
[ https://issues.apache.org/jira/browse/AIRFLOW-959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] George Leslie-Waksman updated AIRFLOW-959: -- Issue Type: Bug (was: Improvement) > .gitignore file is disorganized and incomplete > -- > > Key: AIRFLOW-959 > URL: https://issues.apache.org/jira/browse/AIRFLOW-959 > Project: Apache Airflow > Issue Type: Bug >Reporter: George Leslie-Waksman >Assignee: George Leslie-Waksman >Priority: Trivial > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-960) Add support for .editorconfig
George Leslie-Waksman created AIRFLOW-960: - Summary: Add support for .editorconfig Key: AIRFLOW-960 URL: https://issues.apache.org/jira/browse/AIRFLOW-960 Project: Apache Airflow Issue Type: Improvement Reporter: George Leslie-Waksman Assignee: George Leslie-Waksman Priority: Trivial -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-931) LocalExecutor fails to run queued task with race condition
[ https://issues.apache.org/jira/browse/AIRFLOW-931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1590#comment-1590 ] ASF subversion and git services commented on AIRFLOW-931: - Commit 07d40d7cdae0a1601c4cc396bb737e44f19cb398 in incubator-airflow's branch refs/heads/v1-8-stable from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=07d40d7 ] [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 (cherry picked from commit e42398100a3248eddb6b511ade73f6a239e58090) Signed-off-by: Bolke de Bruin> LocalExecutor fails to run queued task with race condition > -- > > Key: AIRFLOW-931 > URL: https://issues.apache.org/jira/browse/AIRFLOW-931 > Project: Apache Airflow > Issue Type: Sub-task >Affects Versions: Airflow 1.8, 1.8.0rc4 >Reporter: Vijay Krishna Ramesh >Assignee: Bolke de Bruin > > https://gist.github.com/vijaykramesh/707262c83429ab2a3f5ee701879813e3 > provides a small example that consistently hits this problem with > LocalExecutor. > Basically when the dag run kicks off (with concurrency > 1) and a > LocalExecutor with parallelism > 2 the scheduler marks more than concurrency > tasks as queued > (https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1095) > There is a second check before actually running the task > (https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1291) > that leaves the task in the QUEUED state but then the scheduler never picks > it back up. This causes the DAG to get stuck (as the queued tasks never run) > until the scheduler is restarted (at which point the enqueued tasks are > considered orphaned, the status is set to NONE, and then they are picked up > by the scheduler again and run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-931) LocalExecutor fails to run queued task with race condition
[ https://issues.apache.org/jira/browse/AIRFLOW-931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-931. Resolution: Fixed Fix Version/s: 1.8.0 Issue resolved by pull request #2127 [https://github.com/apache/incubator-airflow/pull/2127] > LocalExecutor fails to run queued task with race condition > -- > > Key: AIRFLOW-931 > URL: https://issues.apache.org/jira/browse/AIRFLOW-931 > Project: Apache Airflow > Issue Type: Sub-task >Affects Versions: Airflow 1.8, 1.8.0rc4 >Reporter: Vijay Krishna Ramesh >Assignee: Bolke de Bruin > Fix For: 1.8.0 > > > https://gist.github.com/vijaykramesh/707262c83429ab2a3f5ee701879813e3 > provides a small example that consistently hits this problem with > LocalExecutor. > Basically when the dag run kicks off (with concurrency > 1) and a > LocalExecutor with parallelism > 2 the scheduler marks more than concurrency > tasks as queued > (https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1095) > There is a second check before actually running the task > (https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1291) > that leaves the task in the QUEUED state but then the scheduler never picks > it back up. This causes the DAG to get stuck (as the queued tasks never run) > until the scheduler is restarted (at which point the enqueued tasks are > considered orphaned, the status is set to NONE, and then they are picked up > by the scheduler again and run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-931) LocalExecutor fails to run queued task with race condition
[ https://issues.apache.org/jira/browse/AIRFLOW-931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903331#comment-15903331 ] ASF subversion and git services commented on AIRFLOW-931: - Commit 4db8f0796642691255b0632d599f33cb9d0ce423 in incubator-airflow's branch refs/heads/v1-8-test from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4db8f07 ] [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 (cherry picked from commit e42398100a3248eddb6b511ade73f6a239e58090) Signed-off-by: Bolke de Bruin> LocalExecutor fails to run queued task with race condition > -- > > Key: AIRFLOW-931 > URL: https://issues.apache.org/jira/browse/AIRFLOW-931 > Project: Apache Airflow > Issue Type: Sub-task >Affects Versions: Airflow 1.8, 1.8.0rc4 >Reporter: Vijay Krishna Ramesh >Assignee: Bolke de Bruin > > https://gist.github.com/vijaykramesh/707262c83429ab2a3f5ee701879813e3 > provides a small example that consistently hits this problem with > LocalExecutor. > Basically when the dag run kicks off (with concurrency > 1) and a > LocalExecutor with parallelism > 2 the scheduler marks more than concurrency > tasks as queued > (https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1095) > There is a second check before actually running the task > (https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1291) > that leaves the task in the QUEUED state but then the scheduler never picks > it back up. This causes the DAG to get stuck (as the queued tasks never run) > until the scheduler is restarted (at which point the enqueued tasks are > considered orphaned, the status is set to NONE, and then they are picked up > by the scheduler again and run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-931) LocalExecutor fails to run queued task with race condition
[ https://issues.apache.org/jira/browse/AIRFLOW-931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903334#comment-15903334 ] ASF subversion and git services commented on AIRFLOW-931: - Commit 07d40d7cdae0a1601c4cc396bb737e44f19cb398 in incubator-airflow's branch refs/heads/v1-8-stable from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=07d40d7 ] [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 (cherry picked from commit e42398100a3248eddb6b511ade73f6a239e58090) Signed-off-by: Bolke de Bruin> LocalExecutor fails to run queued task with race condition > -- > > Key: AIRFLOW-931 > URL: https://issues.apache.org/jira/browse/AIRFLOW-931 > Project: Apache Airflow > Issue Type: Sub-task >Affects Versions: Airflow 1.8, 1.8.0rc4 >Reporter: Vijay Krishna Ramesh >Assignee: Bolke de Bruin > > https://gist.github.com/vijaykramesh/707262c83429ab2a3f5ee701879813e3 > provides a small example that consistently hits this problem with > LocalExecutor. > Basically when the dag run kicks off (with concurrency > 1) and a > LocalExecutor with parallelism > 2 the scheduler marks more than concurrency > tasks as queued > (https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1095) > There is a second check before actually running the task > (https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1291) > that leaves the task in the QUEUED state but then the scheduler never picks > it back up. This causes the DAG to get stuck (as the queued tasks never run) > until the scheduler is restarted (at which point the enqueued tasks are > considered orphaned, the status is set to NONE, and then they are picked up > by the scheduler again and run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-931) LocalExecutor fails to run queued task with race condition
[ https://issues.apache.org/jira/browse/AIRFLOW-931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903332#comment-15903332 ] ASF subversion and git services commented on AIRFLOW-931: - Commit 4db8f0796642691255b0632d599f33cb9d0ce423 in incubator-airflow's branch refs/heads/v1-8-test from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4db8f07 ] [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 (cherry picked from commit e42398100a3248eddb6b511ade73f6a239e58090) Signed-off-by: Bolke de Bruin> LocalExecutor fails to run queued task with race condition > -- > > Key: AIRFLOW-931 > URL: https://issues.apache.org/jira/browse/AIRFLOW-931 > Project: Apache Airflow > Issue Type: Sub-task >Affects Versions: Airflow 1.8, 1.8.0rc4 >Reporter: Vijay Krishna Ramesh >Assignee: Bolke de Bruin > > https://gist.github.com/vijaykramesh/707262c83429ab2a3f5ee701879813e3 > provides a small example that consistently hits this problem with > LocalExecutor. > Basically when the dag run kicks off (with concurrency > 1) and a > LocalExecutor with parallelism > 2 the scheduler marks more than concurrency > tasks as queued > (https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1095) > There is a second check before actually running the task > (https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1291) > that leaves the task in the QUEUED state but then the scheduler never picks > it back up. This causes the DAG to get stuck (as the queued tasks never run) > until the scheduler is restarted (at which point the enqueued tasks are > considered orphaned, the status is set to NONE, and then they are picked up > by the scheduler again and run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-931) LocalExecutor fails to run queued task with race condition
[ https://issues.apache.org/jira/browse/AIRFLOW-931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903329#comment-15903329 ] ASF subversion and git services commented on AIRFLOW-931: - Commit e42398100a3248eddb6b511ade73f6a239e58090 in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e423981 ] [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 > LocalExecutor fails to run queued task with race condition > -- > > Key: AIRFLOW-931 > URL: https://issues.apache.org/jira/browse/AIRFLOW-931 > Project: Apache Airflow > Issue Type: Sub-task >Affects Versions: Airflow 1.8, 1.8.0rc4 >Reporter: Vijay Krishna Ramesh >Assignee: Bolke de Bruin > > https://gist.github.com/vijaykramesh/707262c83429ab2a3f5ee701879813e3 > provides a small example that consistently hits this problem with > LocalExecutor. > Basically when the dag run kicks off (with concurrency > 1) and a > LocalExecutor with parallelism > 2 the scheduler marks more than concurrency > tasks as queued > (https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1095) > There is a second check before actually running the task > (https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1291) > that leaves the task in the QUEUED state but then the scheduler never picks > it back up. This causes the DAG to get stuck (as the queued tasks never run) > until the scheduler is restarted (at which point the enqueued tasks are > considered orphaned, the status is set to NONE, and then they are picked up > by the scheduler again and run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-931) LocalExecutor fails to run queued task with race condition
[ https://issues.apache.org/jira/browse/AIRFLOW-931?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15903330#comment-15903330 ] ASF subversion and git services commented on AIRFLOW-931: - Commit e42398100a3248eddb6b511ade73f6a239e58090 in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e423981 ] [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 > LocalExecutor fails to run queued task with race condition > -- > > Key: AIRFLOW-931 > URL: https://issues.apache.org/jira/browse/AIRFLOW-931 > Project: Apache Airflow > Issue Type: Sub-task >Affects Versions: Airflow 1.8, 1.8.0rc4 >Reporter: Vijay Krishna Ramesh >Assignee: Bolke de Bruin > > https://gist.github.com/vijaykramesh/707262c83429ab2a3f5ee701879813e3 > provides a small example that consistently hits this problem with > LocalExecutor. > Basically when the dag run kicks off (with concurrency > 1) and a > LocalExecutor with parallelism > 2 the scheduler marks more than concurrency > tasks as queued > (https://github.com/apache/incubator-airflow/blob/master/airflow/jobs.py#L1095) > There is a second check before actually running the task > (https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1291) > that leaves the task in the QUEUED state but then the scheduler never picks > it back up. This causes the DAG to get stuck (as the queued tasks never run) > until the scheduler is restarted (at which point the enqueued tasks are > considered orphaned, the status is set to NONE, and then they are picked up > by the scheduler again and run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-931] Do not set QUEUED in TaskInstances
Repository: incubator-airflow Updated Branches: refs/heads/v1-8-stable 50f9ed8c7 -> 07d40d7cd [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 (cherry picked from commit e42398100a3248eddb6b511ade73f6a239e58090) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/07d40d7c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/07d40d7c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/07d40d7c Branch: refs/heads/v1-8-stable Commit: 07d40d7cdae0a1601c4cc396bb737e44f19cb398 Parents: 50f9ed8 Author: Bolke de Bruin Authored: Thu Mar 9 08:32:46 2017 -0800 Committer: Bolke de Bruin Committed: Thu Mar 9 08:33:17 2017 -0800 -- airflow/models.py | 27 ++- tests/models.py | 13 + 2 files changed, 27 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07d40d7c/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index ba8d051..62457f0 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1291,19 +1291,20 @@ class TaskInstance(Base): verbose=True) if not runnable and not mark_success: -if self.state != State.QUEUED: -# If a task's dependencies are met but it can't be run yet then queue it -# instead -self.state = State.QUEUED -msg = "Queuing attempt {attempt} of {total}".format( -attempt=self.try_number % (task.retries + 1) + 1, -total=task.retries + 1) -logging.info(hr + msg + hr) - -self.queued_dttm = datetime.now() -msg = "Queuing into pool {}".format(self.pool) -logging.info(msg) -session.merge(self) +# FIXME: we might have hit concurrency limits, which means we probably +# have been running prematurely. This should be handled in the +# scheduling mechanism. +self.state = State.NONE +msg = ("FIXME: Rescheduling due to concurrency limits reached at task " + "runtime. Attempt {attempt} of {total}. State set to NONE.").format( +attempt=self.try_number % (task.retries + 1) + 1, +total=task.retries + 1) +logging.warning(hr + msg + hr) + +self.queued_dttm = datetime.now() +msg = "Queuing into pool {}".format(self.pool) +logging.info(msg) +session.merge(self) session.commit() return http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/07d40d7c/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index 868ea36..867e293 100644 --- a/tests/models.py +++ b/tests/models.py @@ -289,6 +289,19 @@ class TaskInstanceTest(unittest.TestCase): dag >> op5 self.assertIs(op5.dag, dag) +@patch.object(DAG, 'concurrency_reached') +def test_requeue_over_concurrency(self, mock_concurrency_reached): +mock_concurrency_reached.return_value = True + +dag = DAG(dag_id='test_requeue_over_concurrency', start_date=DEFAULT_DATE, + max_active_runs=1, concurrency=2) +task = DummyOperator(task_id='test_requeue_over_concurrency_op', dag=dag) + +ti = TI(task=task, execution_date=datetime.datetime.now()) +ti.run() +self.assertEqual(ti.state, models.State.NONE) + + @patch.object(TI, 'pool_full') def test_run_pooling_task(self, mock_pool_full): """
incubator-airflow git commit: [AIRFLOW-931] Do not set QUEUED in TaskInstances
Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test 3a5a3235d -> 4db8f0796 [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 (cherry picked from commit e42398100a3248eddb6b511ade73f6a239e58090) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4db8f079 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4db8f079 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4db8f079 Branch: refs/heads/v1-8-test Commit: 4db8f0796642691255b0632d599f33cb9d0ce423 Parents: 3a5a323 Author: Bolke de Bruin Authored: Thu Mar 9 08:32:46 2017 -0800 Committer: Bolke de Bruin Committed: Thu Mar 9 08:32:59 2017 -0800 -- airflow/models.py | 27 ++- tests/models.py | 13 + 2 files changed, 27 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4db8f079/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index ba8d051..62457f0 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1291,19 +1291,20 @@ class TaskInstance(Base): verbose=True) if not runnable and not mark_success: -if self.state != State.QUEUED: -# If a task's dependencies are met but it can't be run yet then queue it -# instead -self.state = State.QUEUED -msg = "Queuing attempt {attempt} of {total}".format( -attempt=self.try_number % (task.retries + 1) + 1, -total=task.retries + 1) -logging.info(hr + msg + hr) - -self.queued_dttm = datetime.now() -msg = "Queuing into pool {}".format(self.pool) -logging.info(msg) -session.merge(self) +# FIXME: we might have hit concurrency limits, which means we probably +# have been running prematurely. This should be handled in the +# scheduling mechanism. +self.state = State.NONE +msg = ("FIXME: Rescheduling due to concurrency limits reached at task " + "runtime. Attempt {attempt} of {total}. State set to NONE.").format( +attempt=self.try_number % (task.retries + 1) + 1, +total=task.retries + 1) +logging.warning(hr + msg + hr) + +self.queued_dttm = datetime.now() +msg = "Queuing into pool {}".format(self.pool) +logging.info(msg) +session.merge(self) session.commit() return http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4db8f079/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index 868ea36..867e293 100644 --- a/tests/models.py +++ b/tests/models.py @@ -289,6 +289,19 @@ class TaskInstanceTest(unittest.TestCase): dag >> op5 self.assertIs(op5.dag, dag) +@patch.object(DAG, 'concurrency_reached') +def test_requeue_over_concurrency(self, mock_concurrency_reached): +mock_concurrency_reached.return_value = True + +dag = DAG(dag_id='test_requeue_over_concurrency', start_date=DEFAULT_DATE, + max_active_runs=1, concurrency=2) +task = DummyOperator(task_id='test_requeue_over_concurrency_op', dag=dag) + +ti = TI(task=task, execution_date=datetime.datetime.now()) +ti.run() +self.assertEqual(ti.state, models.State.NONE) + + @patch.object(TI, 'pool_full') def test_run_pooling_task(self, mock_pool_full): """
incubator-airflow git commit: [AIRFLOW-931] Do not set QUEUED in TaskInstances
Repository: incubator-airflow Updated Branches: refs/heads/master abbb4ee5c -> e42398100 [AIRFLOW-931] Do not set QUEUED in TaskInstances The contract of TaskInstances stipulates that end states for Tasks can only be UP_FOR_RETRY, SUCCESS, FAILED, UPSTREAM_FAILED or SKIPPED. If concurrency was reached task instances were set to QUEUED by the task instance themselves. This would prevent the scheduler to pick them up again. We set the state to NONE now, to ensure integrity. Closes #2127 from bolkedebruin/AIRFLOW-931 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e4239810 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e4239810 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e4239810 Branch: refs/heads/master Commit: e42398100a3248eddb6b511ade73f6a239e58090 Parents: abbb4ee Author: Bolke de BruinAuthored: Thu Mar 9 08:32:46 2017 -0800 Committer: Bolke de Bruin Committed: Thu Mar 9 08:32:46 2017 -0800 -- airflow/models.py | 27 ++- tests/models.py | 13 + 2 files changed, 27 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4239810/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 37f8823..b9fda9f 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1288,19 +1288,20 @@ class TaskInstance(Base): verbose=True) if not runnable and not mark_success: -if self.state != State.QUEUED: -# If a task's dependencies are met but it can't be run yet then queue it -# instead -self.state = State.QUEUED -msg = "Queuing attempt {attempt} of {total}".format( -attempt=self.try_number % (task.retries + 1) + 1, -total=task.retries + 1) -logging.info(hr + msg + hr) - -self.queued_dttm = datetime.now() -msg = "Queuing into pool {}".format(self.pool) -logging.info(msg) -session.merge(self) +# FIXME: we might have hit concurrency limits, which means we probably +# have been running prematurely. This should be handled in the +# scheduling mechanism. +self.state = State.NONE +msg = ("FIXME: Rescheduling due to concurrency limits reached at task " + "runtime. Attempt {attempt} of {total}. State set to NONE.").format( +attempt=self.try_number % (task.retries + 1) + 1, +total=task.retries + 1) +logging.warning(hr + msg + hr) + +self.queued_dttm = datetime.now() +msg = "Queuing into pool {}".format(self.pool) +logging.info(msg) +session.merge(self) session.commit() return http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4239810/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index d758972..9da6f16 100644 --- a/tests/models.py +++ b/tests/models.py @@ -366,6 +366,19 @@ class TaskInstanceTest(unittest.TestCase): dag >> op5 self.assertIs(op5.dag, dag) +@patch.object(DAG, 'concurrency_reached') +def test_requeue_over_concurrency(self, mock_concurrency_reached): +mock_concurrency_reached.return_value = True + +dag = DAG(dag_id='test_requeue_over_concurrency', start_date=DEFAULT_DATE, + max_active_runs=1, concurrency=2) +task = DummyOperator(task_id='test_requeue_over_concurrency_op', dag=dag) + +ti = TI(task=task, execution_date=datetime.datetime.now()) +ti.run() +self.assertEqual(ti.state, models.State.NONE) + + @patch.object(TI, 'pool_full') def test_run_pooling_task(self, mock_pool_full): """
[jira] [Created] (AIRFLOW-957) the execution_date of dagrun that is created by TriggerDagRunOperator is not euqal the execution_date of TriggerDagRunOperator's task instance
Calvin Wang created AIRFLOW-957: --- Summary: the execution_date of dagrun that is created by TriggerDagRunOperator is not euqal the execution_date of TriggerDagRunOperator's task instance Key: AIRFLOW-957 URL: https://issues.apache.org/jira/browse/AIRFLOW-957 Project: Apache Airflow Issue Type: Improvement Components: operators Reporter: Calvin Wang The execution_date of Dagrun that be created by TriggerDagRunOperator is triggered time. When I rerun a dag that execution_date is a past time(e.g. 20160101), the execution_date of external dag that trigger by TriggerDagRunOperator is current time(e.g. 20170309). I expect execution_date is a past time(e.g. 20160101) . Why TriggerDagRunOperator doesn't provide the exection_date option at the beginning? Is there anything wrong in this way? -- This message was sent by Atlassian JIRA (v6.3.15#6346)