[jira] [Updated] (AIRFLOW-1070) Task instance run should catch SIGINT as well as SIGTERM
[ https://issues.apache.org/jira/browse/AIRFLOW-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Chen updated AIRFLOW-1070: - Priority: Trivial (was: Major) > Task instance run should catch SIGINT as well as SIGTERM > > > Key: AIRFLOW-1070 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1070 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Andrew Chen >Priority: Trivial > > At > https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1369 > we catch only the SIGTERM signal. > It'd be nice to catch SIGINT as well since I'd like my task to be killed when > I send to my ``airflow test`` process. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1070) Task instance run should catch SIGINT as well as SIGTERM
Andrew Chen created AIRFLOW-1070: Summary: Task instance run should catch SIGINT as well as SIGTERM Key: AIRFLOW-1070 URL: https://issues.apache.org/jira/browse/AIRFLOW-1070 Project: Apache Airflow Issue Type: New Feature Reporter: Andrew Chen At https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1369 we catch only the SIGTERM signal. It'd be nice to catch SIGINT as well since I'd like my task to be killed when I send to my ``airflow test`` process. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (AIRFLOW-1069) Pool slots not obeyed
[ https://issues.apache.org/jira/browse/AIRFLOW-1069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alex Guziel closed AIRFLOW-1069. Resolution: Invalid > Pool slots not obeyed > - > > Key: AIRFLOW-1069 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1069 > Project: Apache Airflow > Issue Type: Bug >Reporter: Alex Guziel >Assignee: Alex Guziel > > Right now, the decrement is done in an incorrect way that is not preserved > across iterations -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1069) Pool slots not obeyed
Alex Guziel created AIRFLOW-1069: Summary: Pool slots not obeyed Key: AIRFLOW-1069 URL: https://issues.apache.org/jira/browse/AIRFLOW-1069 Project: Apache Airflow Issue Type: Bug Reporter: Alex Guziel Assignee: Alex Guziel Right now, the decrement is done in an incorrect way that is not preserved across iterations -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1068) Autocommit error with Airflow v1.7.1.3 and pymssql v2.1.3
[ https://issues.apache.org/jira/browse/AIRFLOW-1068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Thomas Christie updated AIRFLOW-1068: - Environment: OS: Centos 6 pymssql: 2.1.3 was:OS: Centos 6 Description: After upgrading pymssql started getting the following error when trying to use the MSSQL hook. [2017-04-04 13:02:27,260] {models.py:1286} ERROR - 'pymssql.Connection' object attribute 'autocommit' is read-only Traceback (most recent call last): File "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", line 1245, in run result = task_copy.execute(context=context) File "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/operators/mssql_operator.py", line 34, in execute hook.run(self.sql, parameters=self.parameters) File "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py", line 124, in run self.set_autocommit(conn, autocommit) File "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py", line 138, in set_autocommit conn.autocommit = autocommit AttributeError: 'pymssql.Connection' object attribute 'autocommit' is read-only I looked at the dbapi_hook.py file and this is the offending line: def set_autocommit(self, conn, autocommit): conn.autocommit = autocommit Changing the line to: conn.autocommit(autocommit) seems to work. From what I understand, autocommit was a getter/setter method in pymssql versions <2.0.0. Maybe they've reverted the behavior? > Autocommit error with Airflow v1.7.1.3 and pymssql v2.1.3 > - > > Key: AIRFLOW-1068 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1068 > Project: Apache Airflow > Issue Type: Bug > Components: db, hooks >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Centos 6 > pymssql: 2.1.3 >Reporter: Thomas Christie > > After upgrading pymssql started getting the following error when trying to > use the MSSQL hook. > [2017-04-04 13:02:27,260] {models.py:1286} ERROR - 'pymssql.Connection' > object attribute 'autocommit' is read-only > Traceback (most recent call last): > File > "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 1245, in run > result = task_copy.execute(context=context) > File > "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/operators/mssql_operator.py", > line 34, in execute > hook.run(self.sql, parameters=self.parameters) > File > "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py", > line 124, in run > self.set_autocommit(conn, autocommit) > File > "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py", > line 138, in set_autocommit > conn.autocommit = autocommit > AttributeError: 'pymssql.Connection' object attribute 'autocommit' is > read-only > I looked at the dbapi_hook.py file and this is the offending line: > def set_autocommit(self, conn, autocommit): > conn.autocommit = autocommit > Changing the line to: > conn.autocommit(autocommit) > seems to work. From what I understand, autocommit was a getter/setter method > in pymssql versions <2.0.0. Maybe they've reverted the behavior? -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1068) Autocommit error with Airflow v1.7.1.3 and pymssql v2.1.3
Thomas Christie created AIRFLOW-1068: Summary: Autocommit error with Airflow v1.7.1.3 and pymssql v2.1.3 Key: AIRFLOW-1068 URL: https://issues.apache.org/jira/browse/AIRFLOW-1068 Project: Apache Airflow Issue Type: Bug Components: db, hooks Affects Versions: Airflow 1.7.1.3 Environment: OS: Centos 6 Reporter: Thomas Christie -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-947) Make PrestoHook surface better messages when the Presto Cluster is unavailable.
[ https://issues.apache.org/jira/browse/AIRFLOW-947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arthur Wiedmer resolved AIRFLOW-947. Resolution: Fixed Fix Version/s: 1.9.0 Issue resolved by pull request #2128 [https://github.com/apache/incubator-airflow/pull/2128] > Make PrestoHook surface better messages when the Presto Cluster is > unavailable. > --- > > Key: AIRFLOW-947 > URL: https://issues.apache.org/jira/browse/AIRFLOW-947 > Project: Apache Airflow > Issue Type: Bug >Reporter: Arthur Wiedmer >Assignee: Arthur Wiedmer >Priority: Minor > Fix For: 1.9.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-947) Make PrestoHook surface better messages when the Presto Cluster is unavailable.
[ https://issues.apache.org/jira/browse/AIRFLOW-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1590#comment-1590 ] ASF subversion and git services commented on AIRFLOW-947: - Commit 6dd4b3bc1f366e1f4f4b42d6781f1caee7a5827a in incubator-airflow's branch refs/heads/master from [~artwr] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=6dd4b3b ] [AIRFLOW-947] Improve exceptions for unavailable Presto cluster This improves error logging when the Presto cluster is unavailable and the underlying error is a 503 http response. This introspects the error to prevent trying to access the 'message' attribute when not present. > Make PrestoHook surface better messages when the Presto Cluster is > unavailable. > --- > > Key: AIRFLOW-947 > URL: https://issues.apache.org/jira/browse/AIRFLOW-947 > Project: Apache Airflow > Issue Type: Bug >Reporter: Arthur Wiedmer >Assignee: Arthur Wiedmer >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[2/2] incubator-airflow git commit: Merge pull request #2128 from artwr/artwr-improve_presto_hook_error_when_cluster_is_unavailable
Merge pull request #2128 from artwr/artwr-improve_presto_hook_error_when_cluster_is_unavailable Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f5462c78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f5462c78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f5462c78 Branch: refs/heads/master Commit: f5462c78ff38ec59ec30c688097ff5bb3b3541bb Parents: 70f1bf1 6dd4b3b Author: Arthur WiedmerAuthored: Tue Apr 4 11:20:54 2017 -0700 Committer: Arthur Wiedmer Committed: Tue Apr 4 11:20:54 2017 -0700 -- airflow/hooks/presto_hook.py | 26 -- 1 file changed, 16 insertions(+), 10 deletions(-) --
[jira] [Commented] (AIRFLOW-1060) dag lost tracking the status of tasks and stuck in running state
[ https://issues.apache.org/jira/browse/AIRFLOW-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955370#comment-15955370 ] Serhii commented on AIRFLOW-1060: - I have the same issue. It is 100% reproducible on 17.1.2 and 1.7.1.3 versions. If task takes more 1 hour then subdag containing this task stucks in running state. In logs for SubDagOperator I have found some mysterious error: [2017-04-04 15:04:35,076] {jobs.py:965} ERROR - The airflow run command failed at reporting an error. This should not occur in normal circumstances. Task state is 'running',reported state is 'success'. TI is > dag lost tracking the status of tasks and stuck in running state > > > Key: AIRFLOW-1060 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1060 > Project: Apache Airflow > Issue Type: Bug >Reporter: Jeff Liu > > I'm running an airflow 1.7.1 in one of my environment and constantly run into > an issue with the main dag status stuck in "running" state, while the tasks > all have completed successfully. > To resolve the issue, I had to "delete" the dag entry in airflow UI, and > re-run the job manually so the dag job can recognize the tasks are all > completed and set it self to successful after re-run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1067] use example.com in examples
Repository: incubator-airflow Updated Branches: refs/heads/master 8fdfb16cc -> 70f1bf10a [AIRFLOW-1067] use example.com in examples We use airf...@airflow.com in examples. However, https://airflow.com is owned by a company named Airflow (selling fans, etc). We should use airf...@example.com instead. That domain is created for this purpose. Closes #2217 from mengxr/AIRFLOW-1067 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/70f1bf10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/70f1bf10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/70f1bf10 Branch: refs/heads/master Commit: 70f1bf10a5a5ab8f7460d3c0dc5c1a6d955355de Parents: 8fdfb16 Author: Xiangrui MengAuthored: Tue Apr 4 09:22:37 2017 -0700 Committer: Arthur Wiedmer Committed: Tue Apr 4 09:22:37 2017 -0700 -- airflow/api/auth/backend/default.py| 2 +- airflow/config_templates/default_airflow.cfg | 2 +- airflow/config_templates/default_test.cfg | 2 +- .../example_dags/example_emr_job_flow_automatic_steps.py | 2 +- .../contrib/example_dags/example_emr_job_flow_manual_steps.py | 2 +- airflow/contrib/example_dags/example_qubole_operator.py| 2 +- airflow/contrib/example_dags/example_twitter_dag.py| 2 +- airflow/contrib/task_runner/__init__.py| 2 +- airflow/dag/__init__.py| 2 +- airflow/example_dags/docker_copy_data.py | 2 +- airflow/example_dags/example_docker_operator.py| 2 +- airflow/example_dags/example_http_operator.py | 2 +- airflow/example_dags/tutorial.py | 2 +- docs/scheduler.rst | 2 +- docs/tutorial.rst | 6 +++--- scripts/ci/airflow_travis.cfg | 2 +- tests/dags/test_retry_handling_job.py | 2 +- 17 files changed, 19 insertions(+), 19 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70f1bf10/airflow/api/auth/backend/default.py -- diff --git a/airflow/api/auth/backend/default.py b/airflow/api/auth/backend/default.py index 64cae86..49453ea 100644 --- a/airflow/api/auth/backend/default.py +++ b/airflow/api/auth/backend/default.py @@ -26,4 +26,4 @@ def requires_authentication(function): def decorated(*args, **kwargs): return function(*args, **kwargs) -return decorated \ No newline at end of file +return decorated http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70f1bf10/airflow/config_templates/default_airflow.cfg -- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 77c65ca..b28256a 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -231,7 +231,7 @@ smtp_ssl = False # smtp_user = airflow # smtp_password = airflow smtp_port = 25 -smtp_mail_from = airf...@airflow.com +smtp_mail_from = airf...@example.com [celery] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70f1bf10/airflow/config_templates/default_test.cfg -- diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 2d31141..2fb5bb0 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -65,7 +65,7 @@ smtp_host = localhost smtp_user = airflow smtp_port = 25 smtp_password = airflow -smtp_mail_from = airf...@airflow.com +smtp_mail_from = airf...@example.com [celery] celery_app_name = airflow.executors.celery_executor http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70f1bf10/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py -- diff --git a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py index 7f57ad1..b03b36f 100644 --- a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py +++ b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py @@ -22,7 +22,7 @@ DEFAULT_ARGS = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(2), -'email': ['airf...@airflow.com'], +'email':
[jira] [Commented] (AIRFLOW-1067) Should not use airf...@airflow.com in examples
[ https://issues.apache.org/jira/browse/AIRFLOW-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955329#comment-15955329 ] Arthur Wiedmer commented on AIRFLOW-1067: - Duplicate of https://issues.apache.org/jira/browse/AIRFLOW-1066 We had the same idea. > Should not use airf...@airflow.com in examples > -- > > Key: AIRFLOW-1067 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1067 > Project: Apache Airflow > Issue Type: Bug >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Minor > > airflow.com is owned by a company named Airflow (selling fans, etc). We > should use airf...@example.com in all examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (AIRFLOW-1067) Should not use airf...@airflow.com in examples
[ https://issues.apache.org/jira/browse/AIRFLOW-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-1067 started by Xiangrui Meng. -- > Should not use airf...@airflow.com in examples > -- > > Key: AIRFLOW-1067 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1067 > Project: Apache Airflow > Issue Type: Bug >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Minor > > airflow.com is owned by a company named Airflow (selling fans, etc). We > should use airf...@example.com in all examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1067) Should not use airf...@airflow.com in examples
[ https://issues.apache.org/jira/browse/AIRFLOW-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955302#comment-15955302 ] Xiangrui Meng commented on AIRFLOW-1067: https://github.com/apache/incubator-airflow/pull/2217 > Should not use airf...@airflow.com in examples > -- > > Key: AIRFLOW-1067 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1067 > Project: Apache Airflow > Issue Type: Bug >Reporter: Xiangrui Meng >Assignee: Xiangrui Meng >Priority: Minor > > airflow.com is owned by a company named Airflow (selling fans, etc). We > should use airf...@example.com in all examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1067) Should not use airf...@airflow.com in examples
Xiangrui Meng created AIRFLOW-1067: -- Summary: Should not use airf...@airflow.com in examples Key: AIRFLOW-1067 URL: https://issues.apache.org/jira/browse/AIRFLOW-1067 Project: Apache Airflow Issue Type: Bug Reporter: Xiangrui Meng Assignee: Xiangrui Meng Priority: Minor airflow.com is owned by a company named Airflow (selling fans, etc). We should use airf...@example.com in all examples. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1066) Replace instances of airf...@airflow.com with airf...@example.com
Arthur Wiedmer created AIRFLOW-1066: --- Summary: Replace instances of airf...@airflow.com with airf...@example.com Key: AIRFLOW-1066 URL: https://issues.apache.org/jira/browse/AIRFLOW-1066 Project: Apache Airflow Issue Type: Bug Reporter: Arthur Wiedmer Assignee: Arthur Wiedmer Priority: Trivial airflow.com is a registered website to a company selling fans :) We can use example.com as a domain name. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1064] Change default sort to job_id for TaskInstanceModelView
Repository: incubator-airflow Updated Branches: refs/heads/master 4a6bef69d -> 8fdfb16cc [AIRFLOW-1064] Change default sort to job_id for TaskInstanceModelView The TaskInstanceModelView default sort column is on an unindexed column. We shouldn't need an index on start_date, and job_id is just as logical of a default sort. Closes #2215 from saguziel/aguziel-fix-ti-page Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8fdfb16c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8fdfb16c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8fdfb16c Branch: refs/heads/master Commit: 8fdfb16cc3c0903edd8b89b836f5bdf8bf371ce3 Parents: 4a6bef6 Author: Alex GuzielAuthored: Tue Apr 4 17:19:43 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 17:19:43 2017 +0200 -- airflow/www/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fdfb16c/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index a9bab31..3973866 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2336,7 +2336,7 @@ class TaskInstanceModelView(ModelViewOnly): queued_dttm=datetime_f, dag_id=dag_link, duration=duration_f) column_searchable_list = ('dag_id', 'task_id', 'state') -column_default_sort = ('start_date', True) +column_default_sort = ('job_id', True) form_choices = { 'state': [ ('success', 'success'),
[3/4] incubator-airflow git commit: Merge branch 'AIRFLOW-719' into AIRFLOW-719-3
Merge branch 'AIRFLOW-719' into AIRFLOW-719-3 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15fd4d98 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15fd4d98 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15fd4d98 Branch: refs/heads/master Commit: 15fd4d98d141766f81552d270c8b5c43b15f4f44 Parents: f2dae7d eb705fd Author: Bolke de BruinAuthored: Tue Apr 4 11:55:20 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 11:55:20 2017 +0200 -- airflow/operators/latest_only_operator.py | 30 +++- airflow/operators/python_operator.py | 82 +++--- airflow/ti_deps/deps/trigger_rule_dep.py | 6 +- scripts/ci/requirements.txt | 1 + tests/dags/test_dagrun_short_circuit_false.py | 38 - tests/models.py | 77 +- tests/operators/__init__.py | 2 + tests/operators/latest_only_operator.py | 2 +- tests/operators/python_operator.py| 167 - 9 files changed, 301 insertions(+), 104 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15fd4d98/tests/models.py -- diff --cc tests/models.py index 43fccca,3e77894..a013f8a --- a/tests/models.py +++ b/tests/models.py @@@ -223,59 -220,10 +220,43 @@@ class DagRunTest(unittest.TestCase) def test_id_for_date(self): run_id = models.DagRun.id_for_date( datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None)) -self.assertEqual('scheduled__2015-01-02T03:04:05', run_id, - msg='Generated run_id did not match expectations: {0}' - .format(run_id)) +self.assertEqual( +'scheduled__2015-01-02T03:04:05', run_id, +'Generated run_id did not match expectations: {0}'.format(run_id)) + +def test_dagrun_find(self): +session = settings.Session() +now = datetime.datetime.now() + +dag_id1 = "test_dagrun_find_externally_triggered" +dag_run = models.DagRun( +dag_id=dag_id1, +run_id='manual__' + now.isoformat(), +execution_date=now, +start_date=now, +state=State.RUNNING, +external_trigger=True, +) +session.add(dag_run) + +dag_id2 = "test_dagrun_find_not_externally_triggered" +dag_run = models.DagRun( +dag_id=dag_id2, +run_id='manual__' + now.isoformat(), +execution_date=now, +start_date=now, +state=State.RUNNING, +external_trigger=False, +) +session.add(dag_run) + +session.commit() + +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True))) +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False))) +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True))) +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False))) - def test_dagrun_running_when_upstream_skipped(self): - """ - Tests that a DAG run is not failed when an upstream task is skipped - """ - initial_task_states = { - 'test_short_circuit_false': State.SUCCESS, - 'test_state_skipped1': State.SKIPPED, - 'test_state_skipped2': State.NONE, - } - # dags/test_dagrun_short_circuit_false.py - dag_run = self.create_dag_run('test_dagrun_short_circuit_false', - state=State.RUNNING, - task_states=initial_task_states) - updated_dag_state = dag_run.update_state() - self.assertEqual(State.RUNNING, updated_dag_state) - def test_dagrun_success_when_all_skipped(self): """ Tests that a DAG run succeeds when all tasks are skipped
[4/4] incubator-airflow git commit: Merge pull request #2195 from bolkedebruin/AIRFLOW-719
Merge pull request #2195 from bolkedebruin/AIRFLOW-719 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4a6bef69 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4a6bef69 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4a6bef69 Branch: refs/heads/master Commit: 4a6bef69d1817a5fc3ddd6ffe14c2578eaa49cf0 Parents: f2dae7d 15fd4d9 Author: Bolke de BruinAuthored: Tue Apr 4 17:04:12 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 17:04:12 2017 +0200 -- airflow/operators/latest_only_operator.py | 30 +++- airflow/operators/python_operator.py | 82 +++--- airflow/ti_deps/deps/trigger_rule_dep.py | 6 +- scripts/ci/requirements.txt | 1 + tests/dags/test_dagrun_short_circuit_false.py | 38 - tests/models.py | 77 +- tests/operators/__init__.py | 2 + tests/operators/latest_only_operator.py | 2 +- tests/operators/python_operator.py| 167 - 9 files changed, 301 insertions(+), 104 deletions(-) --
[1/4] incubator-airflow git commit: Revert "[AIRFLOW-719] Prevent DAGs from ending prematurely"
Repository: incubator-airflow Updated Branches: refs/heads/master f2dae7d15 -> 4a6bef69d Revert "[AIRFLOW-719] Prevent DAGs from ending prematurely" This reverts commit 1fdcf2480555f06cce3fc9bba97fbf3d64f074d3. This reinstates the previous logic (< 1.8.0) that ALL_SUCCESS requires all tasks to be successful instead of also counting SKIPPED tasks as part of the successful tasks. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92965e82 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92965e82 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92965e82 Branch: refs/heads/master Commit: 92965e8275c6f2ec2282ad46c09950bab10c1cb2 Parents: 4c09050 Author: Bolke de BruinAuthored: Mon Mar 27 20:12:29 2017 -0700 Committer: Bolke de Bruin Committed: Tue Mar 28 17:42:48 2017 -0700 -- airflow/ti_deps/deps/trigger_rule_dep.py | 6 +- tests/dags/test_dagrun_short_circuit_false.py | 38 -- tests/models.py | 83 +++--- 3 files changed, 46 insertions(+), 81 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/airflow/ti_deps/deps/trigger_rule_dep.py -- diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py b/airflow/ti_deps/deps/trigger_rule_dep.py index 3a77b00..cf06c0b 100644 --- a/airflow/ti_deps/deps/trigger_rule_dep.py +++ b/airflow/ti_deps/deps/trigger_rule_dep.py @@ -135,7 +135,7 @@ class TriggerRuleDep(BaseTIDep): if tr == TR.ALL_SUCCESS: if upstream_failed or failed: ti.set_state(State.UPSTREAM_FAILED, session) -elif skipped == upstream: +elif skipped: ti.set_state(State.SKIPPED, session) elif tr == TR.ALL_FAILED: if successes or skipped: @@ -148,7 +148,7 @@ class TriggerRuleDep(BaseTIDep): ti.set_state(State.SKIPPED, session) if tr == TR.ONE_SUCCESS: -if successes <= 0 and skipped <= 0: +if successes <= 0: yield self._failing_status( reason="Task's trigger rule '{0}' requires one upstream " "task success, but none were found. " @@ -162,7 +162,7 @@ class TriggerRuleDep(BaseTIDep): "upstream_tasks_state={1}, upstream_task_ids={2}" .format(tr, upstream_tasks_state, task.upstream_task_ids)) elif tr == TR.ALL_SUCCESS: -num_failures = upstream - (successes + skipped) +num_failures = upstream - successes if num_failures > 0: yield self._failing_status( reason="Task's trigger rule '{0}' requires all upstream " http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/tests/dags/test_dagrun_short_circuit_false.py -- diff --git a/tests/dags/test_dagrun_short_circuit_false.py b/tests/dags/test_dagrun_short_circuit_false.py deleted file mode 100644 index 805ab67..000 --- a/tests/dags/test_dagrun_short_circuit_false.py +++ /dev/null @@ -1,38 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from datetime import datetime - -from airflow.models import DAG -from airflow.operators.python_operator import ShortCircuitOperator -from airflow.operators.dummy_operator import DummyOperator - - -# DAG that has its short circuit op fail and skip multiple downstream tasks -dag = DAG( -dag_id='test_dagrun_short_circuit_false', -start_date=datetime(2017, 1, 1) -) -dag_task1 = ShortCircuitOperator( -task_id='test_short_circuit_false', -dag=dag, -python_callable=lambda: False) -dag_task2 = DummyOperator( -task_id='test_state_skipped1', -dag=dag) -dag_task3 = DummyOperator( -task_id='test_state_skipped2', -dag=dag) -dag_task1.set_downstream(dag_task2) -dag_task2.set_downstream(dag_task3) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/tests/models.py -- diff --git
[2/4] incubator-airflow git commit: [AIRFLOW-719] Fix race condition in ShortCircuit, Branch and LatestOnly
[AIRFLOW-719] Fix race condition in ShortCircuit, Branch and LatestOnly Both the ShortCircuitOperator, Branchoperator and LatestOnlyOperator were arbitrarily changing the states of TaskInstances without locking them in the database. As the scheduler checks the state of dag runs asynchronously the dag run state could be set to failed while the operators are updating the downstream tasks. A better fix would to use the dag run iteself in the context of the Operator. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb705fd5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb705fd5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb705fd5 Branch: refs/heads/master Commit: eb705fd55c30cea778282140d927f51b4a649c73 Parents: 92965e8 Author: Bolke de BruinAuthored: Tue Mar 28 16:29:39 2017 -0700 Committer: Bolke de Bruin Committed: Mon Apr 3 10:38:12 2017 +0200 -- airflow/operators/latest_only_operator.py | 30 - airflow/operators/python_operator.py | 82 +--- scripts/ci/requirements.txt | 1 + tests/operators/__init__.py | 2 + tests/operators/latest_only_operator.py | 2 +- tests/operators/python_operator.py| 167 - 6 files changed, 258 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/airflow/operators/latest_only_operator.py -- diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py index 8b4e614..9d5defb 100644 --- a/airflow/operators/latest_only_operator.py +++ b/airflow/operators/latest_only_operator.py @@ -34,7 +34,7 @@ class LatestOnlyOperator(BaseOperator): def execute(self, context): # If the DAG Run is externally triggered, then return without # skipping downstream tasks -if context['dag_run'].external_trigger: +if context['dag_run'] and context['dag_run'].external_trigger: logging.info("""Externally triggered DAG_Run: allowing execution to proceed.""") return @@ -46,17 +46,39 @@ class LatestOnlyOperator(BaseOperator): logging.info( 'Checking latest only with left_window: %s right_window: %s ' 'now: %s', left_window, right_window, now) + if not left_window < now <= right_window: logging.info('Not latest execution, skipping downstream.') session = settings.Session() -for task in context['task'].downstream_list: -ti = TaskInstance( -task, execution_date=context['ti'].execution_date) + +TI = TaskInstance +tis = session.query(TI).filter( +TI.execution_date == context['ti'].execution_date, +TI.task_id.in_(context['task'].downstream_task_ids) +).with_for_update().all() + +for ti in tis: logging.info('Skipping task: %s', ti.task_id) ti.state = State.SKIPPED ti.start_date = now ti.end_date = now session.merge(ti) + +# this is defensive against dag runs that are not complete +for task in context['task'].downstream_list: +if task.task_id in tis: +continue + +logging.warning("Task {} was not part of a dag run. " +"This should not happen." +.format(task)) +now = datetime.datetime.now() +ti = TaskInstance(task, execution_date=context['ti'].execution_date) +ti.state = State.SKIPPED +ti.start_date = now +ti.end_date = now +session.merge(ti) + session.commit() session.close() logging.info('Done.') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/airflow/operators/python_operator.py -- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index a17e6fa..cf240f2 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -109,14 +109,36 @@ class BranchPythonOperator(PythonOperator): logging.info("Following branch " + branch) logging.info("Marking other directly downstream tasks as skipped") session = settings.Session() + +TI = TaskInstance +tis = session.query(TI).filter( +TI.execution_date ==
[jira] [Assigned] (AIRFLOW-1065) Add functionality for Azure Blob Storage
[ https://issues.apache.org/jira/browse/AIRFLOW-1065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henk Griffioen reassigned AIRFLOW-1065: --- Assignee: Henk Griffioen > Add functionality for Azure Blob Storage > > > Key: AIRFLOW-1065 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1065 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Henk Griffioen >Assignee: Henk Griffioen > > Currently Airflow has sensors and operators for S3 and GCE but it does not > support Azure Blob Storage. > A hook would interface with Azure Blob storage via the Python library > azure-storage over the wasb protocol. Sensors use the hook to detect if a > blob has landed on a container and operators use it to move files to the blob > storage. > The design for the hook airflow.contrib.hooks.WasbHook would mimic > airflow.operators.S3_hook.S3Hook. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1065) Add functionality for Azure Blob Storage
[ https://issues.apache.org/jira/browse/AIRFLOW-1065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henk Griffioen updated AIRFLOW-1065: Description: Currently Airflow has sensors and operators for S3 and GCE but it does not support Azure Blob Storage. A hook would interface with Azure Blob storage via the Python library azure-storage over the wasb protocol. Sensors use the hook to detect if a blob has landed on a container and operators use it to move files to the blob storage. The design for the hook airflow.contrib.hooks.WasbHook would mimic airflow.operators.S3_hook.S3Hook. was: Currently Airflow has sensors and operators for S3 and GCE but it does not support Azure Blob Storage. A hook would interface with Azure Blob storage via the Python library azure-storage over the wasbs protocol. Sensors use the hook to detect if a blob has landed on a container and operators use it to move files to the blob storage. The design for the hook airflow.contrib.hooks.WasbsHook would mimic airflow.operators.S3_hook.S3Hook. > Add functionality for Azure Blob Storage > > > Key: AIRFLOW-1065 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1065 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Henk Griffioen > > Currently Airflow has sensors and operators for S3 and GCE but it does not > support Azure Blob Storage. > A hook would interface with Azure Blob storage via the Python library > azure-storage over the wasb protocol. Sensors use the hook to detect if a > blob has landed on a container and operators use it to move files to the blob > storage. > The design for the hook airflow.contrib.hooks.WasbHook would mimic > airflow.operators.S3_hook.S3Hook. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1065) Add functionality for Azure Blob Storage
Henk Griffioen created AIRFLOW-1065: --- Summary: Add functionality for Azure Blob Storage Key: AIRFLOW-1065 URL: https://issues.apache.org/jira/browse/AIRFLOW-1065 Project: Apache Airflow Issue Type: New Feature Reporter: Henk Griffioen Currently Airflow has sensors and operators for S3 and GCE but it does not support Azure Blob Storage. A hook would interface with Azure Blob storage via the Python library azure-storage over the wasbs protocol. Sensors use the hook to detect if a blob has landed on a container and operators use it to move files to the blob storage. The design for the hook airflow.contrib.hooks.WasbsHook would mimic airflow.operators.S3_hook.S3Hook. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-1030) HttpHook error when creating HttpSensor
[ https://issues.apache.org/jira/browse/AIRFLOW-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1030. - Resolution: Fixed Fix Version/s: 1.8.1 Issue resolved by pull request #2180 [https://github.com/apache/incubator-airflow/pull/2180] > HttpHook error when creating HttpSensor > --- > > Key: AIRFLOW-1030 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1030 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: Airflow 1.8 > Environment: python3 >Reporter: Paulius Dambrauskas >Assignee: Paulius Dambrauskas > Fix For: 1.8.1 > > > Task: > {code} > sensor = HttpSensor( > task_id='http_sensor_check', > http_conn_id='http_default', > endpoint='', > params={}, > poke_interval=5, > dag=dag > ) > {code} > Exception > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/models.py", > line 268, in process_file > m = imp.load_source(mod_name, filepath) > File "/usr/lib/python3.5/imp.py", line 172, in load_source > module = _load(spec) > File "", line 693, in _load > File "", line 673, in _load_unlocked > File "", line 665, in exec_module > File "", line 222, in _call_with_frames_removed > File "/home/paulius/airflow/dags/cpg_4.py", line 43, in > dag=dag) > File > "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/utils/decorators.py", > line 86, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/operators/sensors.py", > line 663, in __init__ > self.hook = hooks.http_hook.HttpHook(method='GET', > http_conn_id=http_conn_id) > File > "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/utils/helpers.py", > line 436, in __getattr__ > raise AttributeError > AttributeError > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1) Migrate GitHub code to Apache git
[ https://issues.apache.org/jira/browse/AIRFLOW-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954650#comment-15954650 ] ASF subversion and git services commented on AIRFLOW-1: --- Commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d in incubator-airflow's branch refs/heads/master from pdambrauskas [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=f2dae7d ] [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor Closes #2180 from pdambrauskas/fix/http_hook_import > Migrate GitHub code to Apache git > - > > Key: AIRFLOW-1 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1 > Project: Apache Airflow > Issue Type: Improvement > Components: project-management >Reporter: Maxime Beauchemin >Assignee: Maxime Beauchemin > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1) Migrate GitHub code to Apache git
[ https://issues.apache.org/jira/browse/AIRFLOW-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954652#comment-15954652 ] ASF subversion and git services commented on AIRFLOW-1: --- Commit 4db53f39a972cae691dc49687a407dda0ff49aaf in incubator-airflow's branch refs/heads/v1-8-test from pdambrauskas [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4db53f3 ] [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor Closes #2180 from pdambrauskas/fix/http_hook_import (cherry picked from commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d) Signed-off-by: Bolke de Bruin> Migrate GitHub code to Apache git > - > > Key: AIRFLOW-1 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1 > Project: Apache Airflow > Issue Type: Improvement > Components: project-management >Reporter: Maxime Beauchemin >Assignee: Maxime Beauchemin > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1030) HttpHook error when creating HttpSensor
[ https://issues.apache.org/jira/browse/AIRFLOW-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954651#comment-15954651 ] ASF subversion and git services commented on AIRFLOW-1030: -- Commit 4db53f39a972cae691dc49687a407dda0ff49aaf in incubator-airflow's branch refs/heads/v1-8-test from pdambrauskas [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4db53f3 ] [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor Closes #2180 from pdambrauskas/fix/http_hook_import (cherry picked from commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d) Signed-off-by: Bolke de Bruin> HttpHook error when creating HttpSensor > --- > > Key: AIRFLOW-1030 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1030 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: Airflow 1.8 > Environment: python3 >Reporter: Paulius Dambrauskas >Assignee: Paulius Dambrauskas > > Task: > {code} > sensor = HttpSensor( > task_id='http_sensor_check', > http_conn_id='http_default', > endpoint='', > params={}, > poke_interval=5, > dag=dag > ) > {code} > Exception > {code} > Traceback (most recent call last): > File > "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/models.py", > line 268, in process_file > m = imp.load_source(mod_name, filepath) > File "/usr/lib/python3.5/imp.py", line 172, in load_source > module = _load(spec) > File "", line 693, in _load > File "", line 673, in _load_unlocked > File "", line 665, in exec_module > File "", line 222, in _call_with_frames_removed > File "/home/paulius/airflow/dags/cpg_4.py", line 43, in > dag=dag) > File > "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/utils/decorators.py", > line 86, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/operators/sensors.py", > line 663, in __init__ > self.hook = hooks.http_hook.HttpHook(method='GET', > http_conn_id=http_conn_id) > File > "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/utils/helpers.py", > line 436, in __getattr__ > raise AttributeError > AttributeError > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-1051) Add a test for resetdb to CliTests
[ https://issues.apache.org/jira/browse/AIRFLOW-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1051. - Resolution: Fixed Fix Version/s: 1.9.0 Issue resolved by pull request #2198 [https://github.com/apache/incubator-airflow/pull/2198] > Add a test for resetdb to CliTests > -- > > Key: AIRFLOW-1051 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1051 > Project: Apache Airflow > Issue Type: Test > Components: tests >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Minor > Fix For: 1.9.0 > > > CliTests lacks a test for resetdb command for now. It should be added. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1051) Add a test for resetdb to CliTests
[ https://issues.apache.org/jira/browse/AIRFLOW-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954642#comment-15954642 ] ASF subversion and git services commented on AIRFLOW-1051: -- Commit 15aee05dd7104716b22ea7b01b220f9eaea3a72a in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=15aee05 ] [AIRFLOW-1051] Add a test for resetdb to CliTests CliTests lacks a test for resetdb command for now. It should be added. Closes #2198 from sekikn/AIRFLOW-1051 > Add a test for resetdb to CliTests > -- > > Key: AIRFLOW-1051 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1051 > Project: Apache Airflow > Issue Type: Test > Components: tests >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Minor > > CliTests lacks a test for resetdb command for now. It should be added. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1051] Add a test for resetdb to CliTests
Repository: incubator-airflow Updated Branches: refs/heads/master a9b20a04b -> 15aee05dd [AIRFLOW-1051] Add a test for resetdb to CliTests CliTests lacks a test for resetdb command for now. It should be added. Closes #2198 from sekikn/AIRFLOW-1051 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15aee05d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15aee05d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15aee05d Branch: refs/heads/master Commit: 15aee05dd7104716b22ea7b01b220f9eaea3a72a Parents: a9b20a0 Author: Kengo SekiAuthored: Tue Apr 4 08:37:08 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:37:08 2017 +0200 -- tests/core.py | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15aee05d/tests/core.py -- diff --git a/tests/core.py b/tests/core.py index 7da08e1..8b3d1b8 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1089,6 +1089,9 @@ class CliTests(unittest.TestCase): def test_cli_initdb(self): cli.initdb(self.parser.parse_args(['initdb'])) +def test_cli_resetdb(self): +cli.resetdb(self.parser.parse_args(['resetdb', '--yes'])) + def test_cli_connections_list(self): with mock.patch('sys.stdout', new_callable=six.StringIO) as mock_stdout:
[jira] [Commented] (AIRFLOW-1051) Add a test for resetdb to CliTests
[ https://issues.apache.org/jira/browse/AIRFLOW-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954643#comment-15954643 ] ASF subversion and git services commented on AIRFLOW-1051: -- Commit 15aee05dd7104716b22ea7b01b220f9eaea3a72a in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=15aee05 ] [AIRFLOW-1051] Add a test for resetdb to CliTests CliTests lacks a test for resetdb command for now. It should be added. Closes #2198 from sekikn/AIRFLOW-1051 > Add a test for resetdb to CliTests > -- > > Key: AIRFLOW-1051 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1051 > Project: Apache Airflow > Issue Type: Test > Components: tests >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Minor > > CliTests lacks a test for resetdb command for now. It should be added. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-1004) `airflow webserver -D` runs in foreground
[ https://issues.apache.org/jira/browse/AIRFLOW-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1004. - Resolution: Fixed > `airflow webserver -D` runs in foreground > - > > Key: AIRFLOW-1004 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1004 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Reporter: Ruslan Dautkhanov >Assignee: Kengo Seki > Labels: background, restart, rolling, webserver > Fix For: 1.8.1 > > > airflow webserver doesn't want to daemonize > {noformat} > $ airflow webserver --daemon > [2017-03-17 00:06:37,553] {__init__.py:57} INFO - Using executor LocalExecutor > .. skip .. > Running the Gunicorn Server with: > Workers: 4 sync > Host: 0.0.0.0:18111 > Timeout: 120 > Logfiles: - - > = > [2017-03-17 00:06:39,744] {__init__.py:57} INFO - Using executor LocalExecutor > {noformat} > webserver keeps running in foreground. > Sent email regarding this issue to dev list and according to [~bolke], > "This is a (known) bug, since the introduction of the rolling restarts" -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-276) List of dags does not refresh in UI for a while
[ https://issues.apache.org/jira/browse/AIRFLOW-276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954636#comment-15954636 ] ASF subversion and git services commented on AIRFLOW-276: - Commit a9b20a04b052e9479dbb79fd46124293085610e9 in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a9b20a0 ] [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background AIRFLOW-276 introduced a monitor process for gunicorn to find new files in the dag folder, but it also changed `airflow webserver -D`'s behavior to run in foreground. This PR fixes that by running the monitor as a daemon process. Closes #2208 from sekikn/AIRFLOW-1004 > List of dags does not refresh in UI for a while > --- > > Key: AIRFLOW-276 > URL: https://issues.apache.org/jira/browse/AIRFLOW-276 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Li Xuanji >Assignee: Li Xuanji >Priority: Minor > > After creating a new dag (eg by adding a file to `~/airflow/dags`), the web > UI does not show the new for a while. It only shows it when either > 1. gunicorn decides to restart the worker process, or > 2. a scheduler picks up the new dag, adds it to the airflow db, and the web > UI notices it in the db -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1004) `airflow webserver -D` runs in foreground
[ https://issues.apache.org/jira/browse/AIRFLOW-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954637#comment-15954637 ] ASF subversion and git services commented on AIRFLOW-1004: -- Commit a9b20a04b052e9479dbb79fd46124293085610e9 in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a9b20a0 ] [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background AIRFLOW-276 introduced a monitor process for gunicorn to find new files in the dag folder, but it also changed `airflow webserver -D`'s behavior to run in foreground. This PR fixes that by running the monitor as a daemon process. Closes #2208 from sekikn/AIRFLOW-1004 > `airflow webserver -D` runs in foreground > - > > Key: AIRFLOW-1004 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1004 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Reporter: Ruslan Dautkhanov >Assignee: Kengo Seki > Labels: background, restart, rolling, webserver > Fix For: 1.8.1 > > > airflow webserver doesn't want to daemonize > {noformat} > $ airflow webserver --daemon > [2017-03-17 00:06:37,553] {__init__.py:57} INFO - Using executor LocalExecutor > .. skip .. > Running the Gunicorn Server with: > Workers: 4 sync > Host: 0.0.0.0:18111 > Timeout: 120 > Logfiles: - - > = > [2017-03-17 00:06:39,744] {__init__.py:57} INFO - Using executor LocalExecutor > {noformat} > webserver keeps running in foreground. > Sent email regarding this issue to dev list and according to [~bolke], > "This is a (known) bug, since the introduction of the rolling restarts" -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-276) List of dags does not refresh in UI for a while
[ https://issues.apache.org/jira/browse/AIRFLOW-276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954635#comment-15954635 ] ASF subversion and git services commented on AIRFLOW-276: - Commit a9b20a04b052e9479dbb79fd46124293085610e9 in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a9b20a0 ] [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background AIRFLOW-276 introduced a monitor process for gunicorn to find new files in the dag folder, but it also changed `airflow webserver -D`'s behavior to run in foreground. This PR fixes that by running the monitor as a daemon process. Closes #2208 from sekikn/AIRFLOW-1004 > List of dags does not refresh in UI for a while > --- > > Key: AIRFLOW-276 > URL: https://issues.apache.org/jira/browse/AIRFLOW-276 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Li Xuanji >Assignee: Li Xuanji >Priority: Minor > > After creating a new dag (eg by adding a file to `~/airflow/dags`), the web > UI does not show the new for a while. It only shows it when either > 1. gunicorn decides to restart the worker process, or > 2. a scheduler picks up the new dag, adds it to the airflow db, and the web > UI notices it in the db -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1004) `airflow webserver -D` runs in foreground
[ https://issues.apache.org/jira/browse/AIRFLOW-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954634#comment-15954634 ] ASF subversion and git services commented on AIRFLOW-1004: -- Commit a9b20a04b052e9479dbb79fd46124293085610e9 in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a9b20a0 ] [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background AIRFLOW-276 introduced a monitor process for gunicorn to find new files in the dag folder, but it also changed `airflow webserver -D`'s behavior to run in foreground. This PR fixes that by running the monitor as a daemon process. Closes #2208 from sekikn/AIRFLOW-1004 > `airflow webserver -D` runs in foreground > - > > Key: AIRFLOW-1004 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1004 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Reporter: Ruslan Dautkhanov >Assignee: Kengo Seki > Labels: background, restart, rolling, webserver > Fix For: 1.8.1 > > > airflow webserver doesn't want to daemonize > {noformat} > $ airflow webserver --daemon > [2017-03-17 00:06:37,553] {__init__.py:57} INFO - Using executor LocalExecutor > .. skip .. > Running the Gunicorn Server with: > Workers: 4 sync > Host: 0.0.0.0:18111 > Timeout: 120 > Logfiles: - - > = > [2017-03-17 00:06:39,744] {__init__.py:57} INFO - Using executor LocalExecutor > {noformat} > webserver keeps running in foreground. > Sent email regarding this issue to dev list and according to [~bolke], > "This is a (known) bug, since the introduction of the rolling restarts" -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background
Repository: incubator-airflow Updated Branches: refs/heads/master e4494f85e -> a9b20a04b [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background AIRFLOW-276 introduced a monitor process for gunicorn to find new files in the dag folder, but it also changed `airflow webserver -D`'s behavior to run in foreground. This PR fixes that by running the monitor as a daemon process. Closes #2208 from sekikn/AIRFLOW-1004 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9b20a04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9b20a04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9b20a04 Branch: refs/heads/master Commit: a9b20a04b052e9479dbb79fd46124293085610e9 Parents: e4494f8 Author: Kengo SekiAuthored: Tue Apr 4 08:32:44 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:32:44 2017 +0200 -- airflow/bin/cli.py | 64 - tests/core.py | 56 +++ 2 files changed, 109 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9b20a04/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index e9c54e6..e4755c7 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -753,7 +753,12 @@ def webserver(args): app.run(debug=True, port=args.port, host=args.hostname, ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None) else: -pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid) +pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout, args.stderr, args.log_file) +if args.daemon: +handle = setup_logging(log_file) +stdout = open(stdout, 'w+') +stderr = open(stderr, 'w+') + print( textwrap.dedent('''\ Running the Gunicorn Server with: @@ -771,7 +776,6 @@ def webserver(args): '-t', str(worker_timeout), '-b', args.hostname + ':' + str(args.port), '-n', 'airflow-webserver', -'-p', str(pid), '-c', 'airflow.www.gunicorn_config' ] @@ -782,28 +786,66 @@ def webserver(args): run_args += ['--error-logfile', str(args.error_logfile)] if args.daemon: -run_args += ["-D"] +run_args += ['-D', '-p', str(pid)] + if ssl_cert: run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key] run_args += ["airflow.www.app:cached_app()"] -gunicorn_master_proc = subprocess.Popen(run_args) +gunicorn_master_proc = None def kill_proc(dummy_signum, dummy_frame): gunicorn_master_proc.terminate() gunicorn_master_proc.wait() sys.exit(0) -signal.signal(signal.SIGINT, kill_proc) -signal.signal(signal.SIGTERM, kill_proc) +def monitor_gunicorn(gunicorn_master_proc): +# These run forever until SIG{INT, TERM, KILL, ...} signal is sent +if conf.getint('webserver', 'worker_refresh_interval') > 0: +restart_workers(gunicorn_master_proc, num_workers) +else: +while True: +time.sleep(1) -# These run forever until SIG{INT, TERM, KILL, ...} signal is sent -if conf.getint('webserver', 'worker_refresh_interval') > 0: -restart_workers(gunicorn_master_proc, num_workers) +if args.daemon: +base, ext = os.path.splitext(pid) +ctx = daemon.DaemonContext( +pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1), +files_preserve=[handle], +stdout=stdout, +stderr=stderr, +signal_map={ +signal.SIGINT: kill_proc, +signal.SIGTERM: kill_proc +}, +) +with ctx: +subprocess.Popen(run_args) + +# Reading pid file directly, since Popen#pid doesn't +# seem to return the right value with DaemonContext. +while True: +try: +with open(pid) as f: +gunicorn_master_proc_pid = int(f.read()) +break +except IOError: +logging.debug("Waiting for gunicorn's pid file to be created.") +time.sleep(0.1) + +gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) +
[jira] [Commented] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified
[ https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954629#comment-15954629 ] ASF subversion and git services commented on AIRFLOW-1062: -- Commit 010b80aa8b417091705556a07d5970fe0cc4efb2 in incubator-airflow's branch refs/heads/v1-8-test from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=010b80a ] [AIRFLOW-1062] Fix DagRun#find to return correct result DagRun#find returns wrong result if external_trigger=False is specified, because adding filter is skipped on that condition. This PR fixes it. Closes #2210 from sekikn/AIRFLOW-1062 (cherry picked from commit e4494f85ed5593c99949b52e1e0044c2a35f097f) Signed-off-by: Bolke de Bruin> DagRun#find returns wrong result if external_trigger=False is specified > --- > > Key: AIRFLOW-1062 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1062 > Project: Apache Airflow > Issue Type: Bug > Components: models >Reporter: Kengo Seki >Assignee: Kengo Seki > Fix For: 1.8.1 > > > Given the following record, > {code} > sqlite> select id, external_trigger from dag_run; > 1|1 > sqlite> > {code} > the following code should return no result, > {code} > In [1]: from airflow import models > In [2]: models.DagRun.find(external_trigger=False) > {code} > ... but an externally-triggered record is returned erroneously. > {code} > Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified
[ https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1062. - Resolution: Fixed Fix Version/s: 1.8.1 Issue resolved by pull request #2210 [https://github.com/apache/incubator-airflow/pull/2210] > DagRun#find returns wrong result if external_trigger=False is specified > --- > > Key: AIRFLOW-1062 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1062 > Project: Apache Airflow > Issue Type: Bug > Components: models >Reporter: Kengo Seki >Assignee: Kengo Seki > Fix For: 1.8.1 > > > Given the following record, > {code} > sqlite> select id, external_trigger from dag_run; > 1|1 > sqlite> > {code} > the following code should return no result, > {code} > In [1]: from airflow import models > In [2]: models.DagRun.find(external_trigger=False) > {code} > ... but an externally-triggered record is returned erroneously. > {code} > Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified
[ https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954628#comment-15954628 ] ASF subversion and git services commented on AIRFLOW-1062: -- Commit e4494f85ed5593c99949b52e1e0044c2a35f097f in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e4494f8 ] [AIRFLOW-1062] Fix DagRun#find to return correct result DagRun#find returns wrong result if external_trigger=False is specified, because adding filter is skipped on that condition. This PR fixes it. Closes #2210 from sekikn/AIRFLOW-1062 > DagRun#find returns wrong result if external_trigger=False is specified > --- > > Key: AIRFLOW-1062 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1062 > Project: Apache Airflow > Issue Type: Bug > Components: models >Reporter: Kengo Seki >Assignee: Kengo Seki > Fix For: 1.8.1 > > > Given the following record, > {code} > sqlite> select id, external_trigger from dag_run; > 1|1 > sqlite> > {code} > the following code should return no result, > {code} > In [1]: from airflow import models > In [2]: models.DagRun.find(external_trigger=False) > {code} > ... but an externally-triggered record is returned erroneously. > {code} > Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1062] Fix DagRun#find to return correct result
Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test 2bebeaf95 -> 010b80aa8 [AIRFLOW-1062] Fix DagRun#find to return correct result DagRun#find returns wrong result if external_trigger=False is specified, because adding filter is skipped on that condition. This PR fixes it. Closes #2210 from sekikn/AIRFLOW-1062 (cherry picked from commit e4494f85ed5593c99949b52e1e0044c2a35f097f) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/010b80aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/010b80aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/010b80aa Branch: refs/heads/v1-8-test Commit: 010b80aa8b417091705556a07d5970fe0cc4efb2 Parents: 2bebeaf Author: Kengo Seki Authored: Tue Apr 4 08:30:40 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:31:05 2017 +0200 -- airflow/models.py | 2 +- tests/models.py | 33 + 2 files changed, 34 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/010b80aa/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index fdff54e..6828ab6 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3925,7 +3925,7 @@ class DagRun(Base): qry = qry.filter(DR.execution_date == execution_date) if state: qry = qry.filter(DR.state == state) -if external_trigger: +if external_trigger is not None: qry = qry.filter(DR.external_trigger == external_trigger) dr = qry.order_by(DR.execution_date).all() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/010b80aa/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index c63c67e..6673c04 100644 --- a/tests/models.py +++ b/tests/models.py @@ -227,6 +227,39 @@ class DagRunTest(unittest.TestCase): 'scheduled__2015-01-02T03:04:05', run_id, 'Generated run_id did not match expectations: {0}'.format(run_id)) +def test_dagrun_find(self): +session = settings.Session() +now = datetime.datetime.now() + +dag_id1 = "test_dagrun_find_externally_triggered" +dag_run = models.DagRun( +dag_id=dag_id1, +run_id='manual__' + now.isoformat(), +execution_date=now, +start_date=now, +state=State.RUNNING, +external_trigger=True, +) +session.add(dag_run) + +dag_id2 = "test_dagrun_find_not_externally_triggered" +dag_run = models.DagRun( +dag_id=dag_id2, +run_id='manual__' + now.isoformat(), +execution_date=now, +start_date=now, +state=State.RUNNING, +external_trigger=False, +) +session.add(dag_run) + +session.commit() + +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True))) +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False))) +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True))) +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False))) + def test_dagrun_running_when_upstream_skipped(self): """ Tests that a DAG run is not failed when an upstream task is skipped
[jira] [Commented] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified
[ https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954630#comment-15954630 ] ASF subversion and git services commented on AIRFLOW-1062: -- Commit 010b80aa8b417091705556a07d5970fe0cc4efb2 in incubator-airflow's branch refs/heads/v1-8-test from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=010b80a ] [AIRFLOW-1062] Fix DagRun#find to return correct result DagRun#find returns wrong result if external_trigger=False is specified, because adding filter is skipped on that condition. This PR fixes it. Closes #2210 from sekikn/AIRFLOW-1062 (cherry picked from commit e4494f85ed5593c99949b52e1e0044c2a35f097f) Signed-off-by: Bolke de Bruin> DagRun#find returns wrong result if external_trigger=False is specified > --- > > Key: AIRFLOW-1062 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1062 > Project: Apache Airflow > Issue Type: Bug > Components: models >Reporter: Kengo Seki >Assignee: Kengo Seki > Fix For: 1.8.1 > > > Given the following record, > {code} > sqlite> select id, external_trigger from dag_run; > 1|1 > sqlite> > {code} > the following code should return no result, > {code} > In [1]: from airflow import models > In [2]: models.DagRun.find(external_trigger=False) > {code} > ... but an externally-triggered record is returned erroneously. > {code} > Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified
[ https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954627#comment-15954627 ] ASF subversion and git services commented on AIRFLOW-1062: -- Commit e4494f85ed5593c99949b52e1e0044c2a35f097f in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e4494f8 ] [AIRFLOW-1062] Fix DagRun#find to return correct result DagRun#find returns wrong result if external_trigger=False is specified, because adding filter is skipped on that condition. This PR fixes it. Closes #2210 from sekikn/AIRFLOW-1062 > DagRun#find returns wrong result if external_trigger=False is specified > --- > > Key: AIRFLOW-1062 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1062 > Project: Apache Airflow > Issue Type: Bug > Components: models >Reporter: Kengo Seki >Assignee: Kengo Seki > Fix For: 1.8.1 > > > Given the following record, > {code} > sqlite> select id, external_trigger from dag_run; > 1|1 > sqlite> > {code} > the following code should return no result, > {code} > In [1]: from airflow import models > In [2]: models.DagRun.find(external_trigger=False) > {code} > ... but an externally-triggered record is returned erroneously. > {code} > Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1062] Fix DagRun#find to return correct result
Repository: incubator-airflow Updated Branches: refs/heads/master 56501e606 -> e4494f85e [AIRFLOW-1062] Fix DagRun#find to return correct result DagRun#find returns wrong result if external_trigger=False is specified, because adding filter is skipped on that condition. This PR fixes it. Closes #2210 from sekikn/AIRFLOW-1062 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e4494f85 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e4494f85 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e4494f85 Branch: refs/heads/master Commit: e4494f85ed5593c99949b52e1e0044c2a35f097f Parents: 56501e6 Author: Kengo SekiAuthored: Tue Apr 4 08:30:40 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:30:40 2017 +0200 -- airflow/models.py | 2 +- tests/models.py | 33 + 2 files changed, 34 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4494f85/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 5835578..7171c05 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3969,7 +3969,7 @@ class DagRun(Base): qry = qry.filter(DR.execution_date == execution_date) if state: qry = qry.filter(DR.state == state) -if external_trigger: +if external_trigger is not None: qry = qry.filter(DR.external_trigger == external_trigger) dr = qry.order_by(DR.execution_date).all() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4494f85/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index dcba354..43fccca 100644 --- a/tests/models.py +++ b/tests/models.py @@ -227,6 +227,39 @@ class DagRunTest(unittest.TestCase): 'scheduled__2015-01-02T03:04:05', run_id, 'Generated run_id did not match expectations: {0}'.format(run_id)) +def test_dagrun_find(self): +session = settings.Session() +now = datetime.datetime.now() + +dag_id1 = "test_dagrun_find_externally_triggered" +dag_run = models.DagRun( +dag_id=dag_id1, +run_id='manual__' + now.isoformat(), +execution_date=now, +start_date=now, +state=State.RUNNING, +external_trigger=True, +) +session.add(dag_run) + +dag_id2 = "test_dagrun_find_not_externally_triggered" +dag_run = models.DagRun( +dag_id=dag_id2, +run_id='manual__' + now.isoformat(), +execution_date=now, +start_date=now, +state=State.RUNNING, +external_trigger=False, +) +session.add(dag_run) + +session.commit() + +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True))) +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False))) +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True))) +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False))) + def test_dagrun_running_when_upstream_skipped(self): """ Tests that a DAG run is not failed when an upstream task is skipped
[jira] [Commented] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954623#comment-15954623 ] ASF subversion and git services commented on AIRFLOW-1011: -- Commit 2bebeaf9554d35710de6eb1b4006157e105ac79b in incubator-airflow's branch refs/heads/v1-8-test from [~jschmid] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=2bebeaf ] [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs BackfillJob._execute() checks that the next run date is less than or equal to the end date before creating a DAG run and task instances. For SubDAGs, the next run date is not relevant, i.e. schedule_interval can be anything other than None or '@once' and should be ignored. However, current code calculates the next run date for a SubDAG and the condition check mentioned above always fails for SubDAG triggered manually. This change adds a simple check to determine if this is a SubDAG and, if so, sets next run date to DAG run's start date. Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug- backfill-execute-for-subdags (cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc) Signed-off-by: Bolke de Bruin> Fix bug in BackfillJob._execute() for SubDAGs > - > > Key: AIRFLOW-1011 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1011 > Project: Apache Airflow > Issue Type: Bug > Components: backfill, subdag >Affects Versions: 1.8.0 >Reporter: Joe Schmid >Priority: Blocker > Fix For: 1.8.1 > > Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, > 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, > 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, > test_subdag.py > > > The attached test SubDAG is not executed when the parent DAG is triggered > manually. Attached is a simple test DAG that exhibits the issue along with > screenshots showing the UI differences between v1.8 and v1.7.1.3. > Note that if the DAG is run via backfill from command line (e.g. "airflow > backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up > successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954624#comment-15954624 ] ASF subversion and git services commented on AIRFLOW-1011: -- Commit 2bebeaf9554d35710de6eb1b4006157e105ac79b in incubator-airflow's branch refs/heads/v1-8-test from [~jschmid] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=2bebeaf ] [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs BackfillJob._execute() checks that the next run date is less than or equal to the end date before creating a DAG run and task instances. For SubDAGs, the next run date is not relevant, i.e. schedule_interval can be anything other than None or '@once' and should be ignored. However, current code calculates the next run date for a SubDAG and the condition check mentioned above always fails for SubDAG triggered manually. This change adds a simple check to determine if this is a SubDAG and, if so, sets next run date to DAG run's start date. Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug- backfill-execute-for-subdags (cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc) Signed-off-by: Bolke de Bruin> Fix bug in BackfillJob._execute() for SubDAGs > - > > Key: AIRFLOW-1011 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1011 > Project: Apache Airflow > Issue Type: Bug > Components: backfill, subdag >Affects Versions: 1.8.0 >Reporter: Joe Schmid >Priority: Blocker > Fix For: 1.8.1 > > Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, > 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, > 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, > test_subdag.py > > > The attached test SubDAG is not executed when the parent DAG is triggered > manually. Attached is a simple test DAG that exhibits the issue along with > screenshots showing the UI differences between v1.8 and v1.7.1.3. > Note that if the DAG is run via backfill from command line (e.g. "airflow > backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up > successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1011. - Resolution: Fixed Issue resolved by pull request #2179 [https://github.com/apache/incubator-airflow/pull/2179] > Fix bug in BackfillJob._execute() for SubDAGs > - > > Key: AIRFLOW-1011 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1011 > Project: Apache Airflow > Issue Type: Bug > Components: backfill, subdag >Affects Versions: 1.8.0 >Reporter: Joe Schmid >Priority: Blocker > Fix For: 1.8.1 > > Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, > 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, > 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, > test_subdag.py > > > The attached test SubDAG is not executed when the parent DAG is triggered > manually. Attached is a simple test DAG that exhibits the issue along with > screenshots showing the UI differences between v1.8 and v1.7.1.3. > Note that if the DAG is run via backfill from command line (e.g. "airflow > backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up > successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954620#comment-15954620 ] ASF subversion and git services commented on AIRFLOW-1011: -- Commit 56501e6062df9456f7ac4efe94e21940734dd5bc in incubator-airflow's branch refs/heads/master from [~jschmid] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=56501e6 ] [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs BackfillJob._execute() checks that the next run date is less than or equal to the end date before creating a DAG run and task instances. For SubDAGs, the next run date is not relevant, i.e. schedule_interval can be anything other than None or '@once' and should be ignored. However, current code calculates the next run date for a SubDAG and the condition check mentioned above always fails for SubDAG triggered manually. This change adds a simple check to determine if this is a SubDAG and, if so, sets next run date to DAG run's start date. Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug- backfill-execute-for-subdags > Fix bug in BackfillJob._execute() for SubDAGs > - > > Key: AIRFLOW-1011 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1011 > Project: Apache Airflow > Issue Type: Bug > Components: backfill, subdag >Affects Versions: 1.8.0 >Reporter: Joe Schmid >Priority: Blocker > Fix For: 1.8.1 > > Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, > 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, > 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, > test_subdag.py > > > The attached test SubDAG is not executed when the parent DAG is triggered > manually. Attached is a simple test DAG that exhibits the issue along with > screenshots showing the UI differences between v1.8 and v1.7.1.3. > Note that if the DAG is run via backfill from command line (e.g. "airflow > backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up > successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954621#comment-15954621 ] ASF subversion and git services commented on AIRFLOW-1011: -- Commit 56501e6062df9456f7ac4efe94e21940734dd5bc in incubator-airflow's branch refs/heads/master from [~jschmid] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=56501e6 ] [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs BackfillJob._execute() checks that the next run date is less than or equal to the end date before creating a DAG run and task instances. For SubDAGs, the next run date is not relevant, i.e. schedule_interval can be anything other than None or '@once' and should be ignored. However, current code calculates the next run date for a SubDAG and the condition check mentioned above always fails for SubDAG triggered manually. This change adds a simple check to determine if this is a SubDAG and, if so, sets next run date to DAG run's start date. Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug- backfill-execute-for-subdags > Fix bug in BackfillJob._execute() for SubDAGs > - > > Key: AIRFLOW-1011 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1011 > Project: Apache Airflow > Issue Type: Bug > Components: backfill, subdag >Affects Versions: 1.8.0 >Reporter: Joe Schmid >Priority: Blocker > Fix For: 1.8.1 > > Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, > 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, > 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, > test_subdag.py > > > The attached test SubDAG is not executed when the parent DAG is triggered > manually. Attached is a simple test DAG that exhibits the issue along with > screenshots showing the UI differences between v1.8 and v1.7.1.3. > Note that if the DAG is run via backfill from command line (e.g. "airflow > backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up > successfully. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs
Repository: incubator-airflow Updated Branches: refs/heads/master 75addb4a9 -> 56501e606 [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs BackfillJob._execute() checks that the next run date is less than or equal to the end date before creating a DAG run and task instances. For SubDAGs, the next run date is not relevant, i.e. schedule_interval can be anything other than None or '@once' and should be ignored. However, current code calculates the next run date for a SubDAG and the condition check mentioned above always fails for SubDAG triggered manually. This change adds a simple check to determine if this is a SubDAG and, if so, sets next run date to DAG run's start date. Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug- backfill-execute-for-subdags Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/56501e60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/56501e60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/56501e60 Branch: refs/heads/master Commit: 56501e6062df9456f7ac4efe94e21940734dd5bc Parents: 75addb4 Author: Joe SchmidAuthored: Tue Apr 4 08:27:45 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:27:45 2017 +0200 -- airflow/jobs.py | 7 +-- airflow/models.py | 1 + tests/jobs.py | 28 3 files changed, 34 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 006a180..b5c2d5d 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob): # consider max_active_runs but ignore when running subdags # "parent.child" as a dag_id is by convention a subdag -if self.dag.schedule_interval and "." not in self.dag.dag_id: +if self.dag.schedule_interval and not self.dag.is_subdag: active_runs = DagRun.find( dag_id=self.dag.dag_id, state=State.RUNNING, @@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob): # create dag runs dr_start_date = start_date or min([t.start_date for t in self.dag.tasks]) -next_run_date = self.dag.normalize_schedule(dr_start_date) end_date = end_date or datetime.now() +# next run date for a subdag isn't relevant (schedule_interval for subdags +# is ignored) so we use the dag run's start date in the case of a subdag +next_run_date = (self.dag.normalize_schedule(dr_start_date) + if not self.dag.is_subdag else dr_start_date) active_dag_runs = [] while next_run_date and next_run_date <= end_date: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 9d560fb..5835578 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2716,6 +2716,7 @@ class DAG(BaseDag, LoggingMixin): self.default_view = default_view self.orientation = orientation self.catchup = catchup +self.is_subdag = False # DagBag.bag_dag() will set this to True if appropriate self.partial = False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/tests/jobs.py -- diff --git a/tests/jobs.py b/tests/jobs.py index c1d6790..3eb407b 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase): else: self.assertEqual(State.NONE, ti.state) +def test_backfill_execute_subdag(self): +dag = self.dagbag.get_dag('example_subdag_operator') +subdag_op_task = dag.get_task('section-1') + +subdag = subdag_op_task.subdag +subdag.schedule_interval = '@daily' + +start_date = datetime.datetime.now() +executor = TestExecutor(do_update=True) +job = BackfillJob(dag=subdag, + start_date=start_date, + end_date=start_date, + executor=executor, + donot_pickle=True) +job.run() + +history = executor.history +subdag_history = history[0] + +# check that all 5 task instances of the subdag 'section-1' were executed +self.assertEqual(5, len(subdag_history)) +for sdh in subdag_history: +ti = sdh[3] +self.assertIn('section-1-task-', ti.task_id) + +subdag.clear() +
incubator-airflow git commit: [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs
Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test 68b1c982e -> 2bebeaf95 [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs BackfillJob._execute() checks that the next run date is less than or equal to the end date before creating a DAG run and task instances. For SubDAGs, the next run date is not relevant, i.e. schedule_interval can be anything other than None or '@once' and should be ignored. However, current code calculates the next run date for a SubDAG and the condition check mentioned above always fails for SubDAG triggered manually. This change adds a simple check to determine if this is a SubDAG and, if so, sets next run date to DAG run's start date. Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug- backfill-execute-for-subdags (cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2bebeaf9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2bebeaf9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2bebeaf9 Branch: refs/heads/v1-8-test Commit: 2bebeaf9554d35710de6eb1b4006157e105ac79b Parents: 68b1c98 Author: Joe Schmid Authored: Tue Apr 4 08:27:45 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:28:07 2017 +0200 -- airflow/jobs.py | 7 +-- airflow/models.py | 1 + tests/jobs.py | 28 3 files changed, 34 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 222d9ba..7db9b9c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob): # consider max_active_runs but ignore when running subdags # "parent.child" as a dag_id is by convention a subdag -if self.dag.schedule_interval and "." not in self.dag.dag_id: +if self.dag.schedule_interval and not self.dag.is_subdag: active_runs = DagRun.find( dag_id=self.dag.dag_id, state=State.RUNNING, @@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob): # create dag runs dr_start_date = start_date or min([t.start_date for t in self.dag.tasks]) -next_run_date = self.dag.normalize_schedule(dr_start_date) end_date = end_date or datetime.now() +# next run date for a subdag isn't relevant (schedule_interval for subdags +# is ignored) so we use the dag run's start date in the case of a subdag +next_run_date = (self.dag.normalize_schedule(dr_start_date) + if not self.dag.is_subdag else dr_start_date) active_dag_runs = [] while next_run_date and next_run_date <= end_date: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index bdda701..fdff54e 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2682,6 +2682,7 @@ class DAG(BaseDag, LoggingMixin): self.sla_miss_callback = sla_miss_callback self.orientation = orientation self.catchup = catchup +self.is_subdag = False # DagBag.bag_dag() will set this to True if appropriate self.partial = False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/tests/jobs.py -- diff --git a/tests/jobs.py b/tests/jobs.py index aee0e9c..f9ede68 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase): else: self.assertEqual(State.NONE, ti.state) +def test_backfill_execute_subdag(self): +dag = self.dagbag.get_dag('example_subdag_operator') +subdag_op_task = dag.get_task('section-1') + +subdag = subdag_op_task.subdag +subdag.schedule_interval = '@daily' + +start_date = datetime.datetime.now() +executor = TestExecutor(do_update=True) +job = BackfillJob(dag=subdag, + start_date=start_date, + end_date=start_date, + executor=executor, + donot_pickle=True) +job.run() + +history = executor.history +subdag_history = history[0] + +# check that all 5 task instances of the subdag 'section-1' were executed +self.assertEqual(5, len(subdag_history)) +for