[jira] [Updated] (AIRFLOW-1070) Task instance run should catch SIGINT as well as SIGTERM

2017-04-04 Thread Andrew Chen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Chen updated AIRFLOW-1070:
-
Priority: Trivial  (was: Major)

> Task instance run should catch SIGINT as well as SIGTERM
> 
>
> Key: AIRFLOW-1070
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1070
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Andrew Chen
>Priority: Trivial
>
> At 
> https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1369
>  we catch only the SIGTERM signal.
> It'd be nice to catch SIGINT as well since I'd like my task to be killed when 
> I send  to my ``airflow test`` process. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1070) Task instance run should catch SIGINT as well as SIGTERM

2017-04-04 Thread Andrew Chen (JIRA)
Andrew Chen created AIRFLOW-1070:


 Summary: Task instance run should catch SIGINT as well as SIGTERM
 Key: AIRFLOW-1070
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1070
 Project: Apache Airflow
  Issue Type: New Feature
Reporter: Andrew Chen


At 
https://github.com/apache/incubator-airflow/blob/master/airflow/models.py#L1369 
we catch only the SIGTERM signal.

It'd be nice to catch SIGINT as well since I'd like my task to be killed when I 
send  to my ``airflow test`` process. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (AIRFLOW-1069) Pool slots not obeyed

2017-04-04 Thread Alex Guziel (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alex Guziel closed AIRFLOW-1069.

Resolution: Invalid

> Pool slots not obeyed
> -
>
> Key: AIRFLOW-1069
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1069
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Alex Guziel
>Assignee: Alex Guziel
>
> Right now, the decrement is done in an incorrect way that is not preserved 
> across iterations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1069) Pool slots not obeyed

2017-04-04 Thread Alex Guziel (JIRA)
Alex Guziel created AIRFLOW-1069:


 Summary: Pool slots not obeyed
 Key: AIRFLOW-1069
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1069
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Alex Guziel
Assignee: Alex Guziel


Right now, the decrement is done in an incorrect way that is not preserved 
across iterations



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1068) Autocommit error with Airflow v1.7.1.3 and pymssql v2.1.3

2017-04-04 Thread Thomas Christie (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Christie updated AIRFLOW-1068:
-
Environment: 
OS: Centos 6
pymssql: 2.1.3

  was:OS: Centos 6

Description: 
After upgrading pymssql started getting the following error when trying to use 
the MSSQL hook.

[2017-04-04 13:02:27,260] {models.py:1286} ERROR - 'pymssql.Connection' 
object attribute 'autocommit' is read-only
Traceback (most recent call last):
  File 
"/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
 line 1245, in run
result = task_copy.execute(context=context)
  File 
"/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/operators/mssql_operator.py",
 line 34, in execute
hook.run(self.sql, parameters=self.parameters)
  File 
"/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
 line 124, in run
self.set_autocommit(conn, autocommit)
  File 
"/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
 line 138, in set_autocommit
conn.autocommit = autocommit
AttributeError: 'pymssql.Connection' object attribute 'autocommit' is 
read-only

I looked at the dbapi_hook.py file and this is the offending line:

def set_autocommit(self, conn, autocommit):
conn.autocommit = autocommit

Changing the line to:

conn.autocommit(autocommit)

seems to work.  From what I understand, autocommit was a getter/setter method 
in pymssql versions <2.0.0.  Maybe they've reverted the behavior?

> Autocommit error with Airflow v1.7.1.3 and pymssql v2.1.3
> -
>
> Key: AIRFLOW-1068
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1068
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: db, hooks
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Centos 6
> pymssql: 2.1.3
>Reporter: Thomas Christie
>
> After upgrading pymssql started getting the following error when trying to 
> use the MSSQL hook.
> [2017-04-04 13:02:27,260] {models.py:1286} ERROR - 'pymssql.Connection' 
> object attribute 'autocommit' is read-only
> Traceback (most recent call last):
>   File 
> "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py",
>  line 1245, in run
> result = task_copy.execute(context=context)
>   File 
> "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/operators/mssql_operator.py",
>  line 34, in execute
> hook.run(self.sql, parameters=self.parameters)
>   File 
> "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
>  line 124, in run
> self.set_autocommit(conn, autocommit)
>   File 
> "/home/jobrunner/.virtualenvs/airflow/lib/python3.5/site-packages/airflow/hooks/dbapi_hook.py",
>  line 138, in set_autocommit
> conn.autocommit = autocommit
> AttributeError: 'pymssql.Connection' object attribute 'autocommit' is 
> read-only
> I looked at the dbapi_hook.py file and this is the offending line:
> def set_autocommit(self, conn, autocommit):
> conn.autocommit = autocommit
> Changing the line to:
> conn.autocommit(autocommit)
> seems to work.  From what I understand, autocommit was a getter/setter method 
> in pymssql versions <2.0.0.  Maybe they've reverted the behavior?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1068) Autocommit error with Airflow v1.7.1.3 and pymssql v2.1.3

2017-04-04 Thread Thomas Christie (JIRA)
Thomas Christie created AIRFLOW-1068:


 Summary: Autocommit error with Airflow v1.7.1.3 and pymssql v2.1.3
 Key: AIRFLOW-1068
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1068
 Project: Apache Airflow
  Issue Type: Bug
  Components: db, hooks
Affects Versions: Airflow 1.7.1.3
 Environment: OS: Centos 6
Reporter: Thomas Christie






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (AIRFLOW-947) Make PrestoHook surface better messages when the Presto Cluster is unavailable.

2017-04-04 Thread Arthur Wiedmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arthur Wiedmer resolved AIRFLOW-947.

   Resolution: Fixed
Fix Version/s: 1.9.0

Issue resolved by pull request #2128
[https://github.com/apache/incubator-airflow/pull/2128]

> Make PrestoHook surface better messages when the Presto Cluster is 
> unavailable.
> ---
>
> Key: AIRFLOW-947
> URL: https://issues.apache.org/jira/browse/AIRFLOW-947
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Arthur Wiedmer
>Assignee: Arthur Wiedmer
>Priority: Minor
> Fix For: 1.9.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-947) Make PrestoHook surface better messages when the Presto Cluster is unavailable.

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1590#comment-1590
 ] 

ASF subversion and git services commented on AIRFLOW-947:
-

Commit 6dd4b3bc1f366e1f4f4b42d6781f1caee7a5827a in incubator-airflow's branch 
refs/heads/master from [~artwr]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=6dd4b3b ]

[AIRFLOW-947] Improve exceptions for unavailable Presto cluster

This improves error logging when the Presto cluster is unavailable
and the underlying error is a 503 http response. This introspects
the error to prevent trying to access the 'message' attribute when
not present.


> Make PrestoHook surface better messages when the Presto Cluster is 
> unavailable.
> ---
>
> Key: AIRFLOW-947
> URL: https://issues.apache.org/jira/browse/AIRFLOW-947
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Arthur Wiedmer
>Assignee: Arthur Wiedmer
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[2/2] incubator-airflow git commit: Merge pull request #2128 from artwr/artwr-improve_presto_hook_error_when_cluster_is_unavailable

2017-04-04 Thread arthur
Merge pull request #2128 from 
artwr/artwr-improve_presto_hook_error_when_cluster_is_unavailable


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f5462c78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f5462c78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f5462c78

Branch: refs/heads/master
Commit: f5462c78ff38ec59ec30c688097ff5bb3b3541bb
Parents: 70f1bf1 6dd4b3b
Author: Arthur Wiedmer 
Authored: Tue Apr 4 11:20:54 2017 -0700
Committer: Arthur Wiedmer 
Committed: Tue Apr 4 11:20:54 2017 -0700

--
 airflow/hooks/presto_hook.py | 26 --
 1 file changed, 16 insertions(+), 10 deletions(-)
--




[jira] [Commented] (AIRFLOW-1060) dag lost tracking the status of tasks and stuck in running state

2017-04-04 Thread Serhii (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955370#comment-15955370
 ] 

Serhii commented on AIRFLOW-1060:
-

I have the same issue. It is 100% reproducible on 17.1.2 and 1.7.1.3 versions. 
If task takes more 1 hour then subdag containing this task stucks in running 
state.

In logs for SubDagOperator I have found some mysterious error:
 
[2017-04-04 15:04:35,076] {jobs.py:965} ERROR - The airflow run command failed 
at reporting an error. This should not occur in normal circumstances. Task 
state is 'running',reported state is 'success'. TI is 

> dag lost tracking the status of tasks and stuck in running state
> 
>
> Key: AIRFLOW-1060
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1060
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Jeff Liu
>
> I'm running an airflow 1.7.1 in one of my environment and constantly run into 
> an issue with the main dag status stuck in "running" state, while the tasks 
> all have completed successfully.
> To resolve the issue, I had to "delete" the dag entry in airflow UI, and 
> re-run the job manually so the dag job can recognize the tasks are all 
> completed and set it self to successful after re-run. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1067] use example.com in examples

2017-04-04 Thread arthur
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8fdfb16cc -> 70f1bf10a


[AIRFLOW-1067] use example.com in examples

We use airf...@airflow.com in examples. However,
https://airflow.com
is owned by a company named Airflow (selling fans,
etc). We should use
airf...@example.com instead. That domain is
created for this purpose.

Closes #2217 from mengxr/AIRFLOW-1067


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/70f1bf10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/70f1bf10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/70f1bf10

Branch: refs/heads/master
Commit: 70f1bf10a5a5ab8f7460d3c0dc5c1a6d955355de
Parents: 8fdfb16
Author: Xiangrui Meng 
Authored: Tue Apr 4 09:22:37 2017 -0700
Committer: Arthur Wiedmer 
Committed: Tue Apr 4 09:22:37 2017 -0700

--
 airflow/api/auth/backend/default.py| 2 +-
 airflow/config_templates/default_airflow.cfg   | 2 +-
 airflow/config_templates/default_test.cfg  | 2 +-
 .../example_dags/example_emr_job_flow_automatic_steps.py   | 2 +-
 .../contrib/example_dags/example_emr_job_flow_manual_steps.py  | 2 +-
 airflow/contrib/example_dags/example_qubole_operator.py| 2 +-
 airflow/contrib/example_dags/example_twitter_dag.py| 2 +-
 airflow/contrib/task_runner/__init__.py| 2 +-
 airflow/dag/__init__.py| 2 +-
 airflow/example_dags/docker_copy_data.py   | 2 +-
 airflow/example_dags/example_docker_operator.py| 2 +-
 airflow/example_dags/example_http_operator.py  | 2 +-
 airflow/example_dags/tutorial.py   | 2 +-
 docs/scheduler.rst | 2 +-
 docs/tutorial.rst  | 6 +++---
 scripts/ci/airflow_travis.cfg  | 2 +-
 tests/dags/test_retry_handling_job.py  | 2 +-
 17 files changed, 19 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70f1bf10/airflow/api/auth/backend/default.py
--
diff --git a/airflow/api/auth/backend/default.py 
b/airflow/api/auth/backend/default.py
index 64cae86..49453ea 100644
--- a/airflow/api/auth/backend/default.py
+++ b/airflow/api/auth/backend/default.py
@@ -26,4 +26,4 @@ def requires_authentication(function):
 def decorated(*args, **kwargs):
 return function(*args, **kwargs)
 
-return decorated
\ No newline at end of file
+return decorated

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70f1bf10/airflow/config_templates/default_airflow.cfg
--
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 77c65ca..b28256a 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -231,7 +231,7 @@ smtp_ssl = False
 # smtp_user = airflow
 # smtp_password = airflow
 smtp_port = 25
-smtp_mail_from = airf...@airflow.com
+smtp_mail_from = airf...@example.com
 
 
 [celery]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70f1bf10/airflow/config_templates/default_test.cfg
--
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index 2d31141..2fb5bb0 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -65,7 +65,7 @@ smtp_host = localhost
 smtp_user = airflow
 smtp_port = 25
 smtp_password = airflow
-smtp_mail_from = airf...@airflow.com
+smtp_mail_from = airf...@example.com
 
 [celery]
 celery_app_name = airflow.executors.celery_executor

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/70f1bf10/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
--
diff --git 
a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py 
b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
index 7f57ad1..b03b36f 100644
--- a/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
+++ b/airflow/contrib/example_dags/example_emr_job_flow_automatic_steps.py
@@ -22,7 +22,7 @@ DEFAULT_ARGS = {
 'owner': 'airflow',
 'depends_on_past': False,
 'start_date': airflow.utils.dates.days_ago(2),
-'email': ['airf...@airflow.com'],
+'email': 

[jira] [Commented] (AIRFLOW-1067) Should not use airf...@airflow.com in examples

2017-04-04 Thread Arthur Wiedmer (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955329#comment-15955329
 ] 

Arthur Wiedmer commented on AIRFLOW-1067:
-

Duplicate of https://issues.apache.org/jira/browse/AIRFLOW-1066 We had the same 
idea.

> Should not use airf...@airflow.com in examples
> --
>
> Key: AIRFLOW-1067
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1067
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Minor
>
> airflow.com is owned by a company named Airflow (selling fans, etc). We 
> should use airf...@example.com in all examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (AIRFLOW-1067) Should not use airf...@airflow.com in examples

2017-04-04 Thread Xiangrui Meng (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-1067 started by Xiangrui Meng.
--
> Should not use airf...@airflow.com in examples
> --
>
> Key: AIRFLOW-1067
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1067
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Minor
>
> airflow.com is owned by a company named Airflow (selling fans, etc). We 
> should use airf...@example.com in all examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1067) Should not use airf...@airflow.com in examples

2017-04-04 Thread Xiangrui Meng (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15955302#comment-15955302
 ] 

Xiangrui Meng commented on AIRFLOW-1067:


https://github.com/apache/incubator-airflow/pull/2217

> Should not use airf...@airflow.com in examples
> --
>
> Key: AIRFLOW-1067
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1067
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Xiangrui Meng
>Assignee: Xiangrui Meng
>Priority: Minor
>
> airflow.com is owned by a company named Airflow (selling fans, etc). We 
> should use airf...@example.com in all examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1067) Should not use airf...@airflow.com in examples

2017-04-04 Thread Xiangrui Meng (JIRA)
Xiangrui Meng created AIRFLOW-1067:
--

 Summary: Should not use airf...@airflow.com in examples
 Key: AIRFLOW-1067
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1067
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Xiangrui Meng
Assignee: Xiangrui Meng
Priority: Minor


airflow.com is owned by a company named Airflow (selling fans, etc). We should 
use airf...@example.com in all examples.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1066) Replace instances of airf...@airflow.com with airf...@example.com

2017-04-04 Thread Arthur Wiedmer (JIRA)
Arthur Wiedmer created AIRFLOW-1066:
---

 Summary: Replace instances of airf...@airflow.com with 
airf...@example.com
 Key: AIRFLOW-1066
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1066
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Arthur Wiedmer
Assignee: Arthur Wiedmer
Priority: Trivial


airflow.com is a registered website to a company selling fans :) We can use 
example.com as a domain name.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1064] Change default sort to job_id for TaskInstanceModelView

2017-04-04 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4a6bef69d -> 8fdfb16cc


[AIRFLOW-1064] Change default sort to job_id for TaskInstanceModelView

The TaskInstanceModelView default sort column is
on an unindexed column.
We shouldn't need an index on start_date, and
job_id is just as logical
of a default sort.

Closes #2215 from saguziel/aguziel-fix-ti-page


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8fdfb16c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8fdfb16c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8fdfb16c

Branch: refs/heads/master
Commit: 8fdfb16cc3c0903edd8b89b836f5bdf8bf371ce3
Parents: 4a6bef6
Author: Alex Guziel 
Authored: Tue Apr 4 17:19:43 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 17:19:43 2017 +0200

--
 airflow/www/views.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8fdfb16c/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index a9bab31..3973866 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2336,7 +2336,7 @@ class TaskInstanceModelView(ModelViewOnly):
 queued_dttm=datetime_f,
 dag_id=dag_link, duration=duration_f)
 column_searchable_list = ('dag_id', 'task_id', 'state')
-column_default_sort = ('start_date', True)
+column_default_sort = ('job_id', True)
 form_choices = {
 'state': [
 ('success', 'success'),



[3/4] incubator-airflow git commit: Merge branch 'AIRFLOW-719' into AIRFLOW-719-3

2017-04-04 Thread bolke
Merge branch 'AIRFLOW-719' into AIRFLOW-719-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15fd4d98
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15fd4d98
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15fd4d98

Branch: refs/heads/master
Commit: 15fd4d98d141766f81552d270c8b5c43b15f4f44
Parents: f2dae7d eb705fd
Author: Bolke de Bruin 
Authored: Tue Apr 4 11:55:20 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 11:55:20 2017 +0200

--
 airflow/operators/latest_only_operator.py |  30 +++-
 airflow/operators/python_operator.py  |  82 +++---
 airflow/ti_deps/deps/trigger_rule_dep.py  |   6 +-
 scripts/ci/requirements.txt   |   1 +
 tests/dags/test_dagrun_short_circuit_false.py |  38 -
 tests/models.py   |  77 +-
 tests/operators/__init__.py   |   2 +
 tests/operators/latest_only_operator.py   |   2 +-
 tests/operators/python_operator.py| 167 -
 9 files changed, 301 insertions(+), 104 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15fd4d98/tests/models.py
--
diff --cc tests/models.py
index 43fccca,3e77894..a013f8a
--- a/tests/models.py
+++ b/tests/models.py
@@@ -223,59 -220,10 +220,43 @@@ class DagRunTest(unittest.TestCase)
  def test_id_for_date(self):
  run_id = models.DagRun.id_for_date(
  datetime.datetime(2015, 1, 2, 3, 4, 5, 6, None))
 -self.assertEqual('scheduled__2015-01-02T03:04:05', run_id,
 - msg='Generated run_id did not match expectations: 
{0}'
 - .format(run_id))
 +self.assertEqual(
 +'scheduled__2015-01-02T03:04:05', run_id,
 +'Generated run_id did not match expectations: {0}'.format(run_id))
 +
 +def test_dagrun_find(self):
 +session = settings.Session()
 +now = datetime.datetime.now()
 +
 +dag_id1 = "test_dagrun_find_externally_triggered"
 +dag_run = models.DagRun(
 +dag_id=dag_id1,
 +run_id='manual__' + now.isoformat(),
 +execution_date=now,
 +start_date=now,
 +state=State.RUNNING,
 +external_trigger=True,
 +)
 +session.add(dag_run)
 +
 +dag_id2 = "test_dagrun_find_not_externally_triggered"
 +dag_run = models.DagRun(
 +dag_id=dag_id2,
 +run_id='manual__' + now.isoformat(),
 +execution_date=now,
 +start_date=now,
 +state=State.RUNNING,
 +external_trigger=False,
 +)
 +session.add(dag_run)
 +
 +session.commit()
 +
 +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, 
external_trigger=True)))
 +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, 
external_trigger=False)))
 +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, 
external_trigger=True)))
 +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, 
external_trigger=False)))
  
- def test_dagrun_running_when_upstream_skipped(self):
- """
- Tests that a DAG run is not failed when an upstream task is skipped
- """
- initial_task_states = {
- 'test_short_circuit_false': State.SUCCESS,
- 'test_state_skipped1': State.SKIPPED,
- 'test_state_skipped2': State.NONE,
- }
- # dags/test_dagrun_short_circuit_false.py
- dag_run = self.create_dag_run('test_dagrun_short_circuit_false',
-   state=State.RUNNING,
-   task_states=initial_task_states)
- updated_dag_state = dag_run.update_state()
- self.assertEqual(State.RUNNING, updated_dag_state)
- 
  def test_dagrun_success_when_all_skipped(self):
  """
  Tests that a DAG run succeeds when all tasks are skipped



[4/4] incubator-airflow git commit: Merge pull request #2195 from bolkedebruin/AIRFLOW-719

2017-04-04 Thread bolke
Merge pull request #2195 from bolkedebruin/AIRFLOW-719


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4a6bef69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4a6bef69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4a6bef69

Branch: refs/heads/master
Commit: 4a6bef69d1817a5fc3ddd6ffe14c2578eaa49cf0
Parents: f2dae7d 15fd4d9
Author: Bolke de Bruin 
Authored: Tue Apr 4 17:04:12 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 17:04:12 2017 +0200

--
 airflow/operators/latest_only_operator.py |  30 +++-
 airflow/operators/python_operator.py  |  82 +++---
 airflow/ti_deps/deps/trigger_rule_dep.py  |   6 +-
 scripts/ci/requirements.txt   |   1 +
 tests/dags/test_dagrun_short_circuit_false.py |  38 -
 tests/models.py   |  77 +-
 tests/operators/__init__.py   |   2 +
 tests/operators/latest_only_operator.py   |   2 +-
 tests/operators/python_operator.py| 167 -
 9 files changed, 301 insertions(+), 104 deletions(-)
--




[1/4] incubator-airflow git commit: Revert "[AIRFLOW-719] Prevent DAGs from ending prematurely"

2017-04-04 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master f2dae7d15 -> 4a6bef69d


Revert "[AIRFLOW-719] Prevent DAGs from ending prematurely"

This reverts commit 1fdcf2480555f06cce3fc9bba97fbf3d64f074d3.

This reinstates the previous logic (< 1.8.0) that ALL_SUCCESS requires
all tasks to be successful instead of also counting SKIPPED
tasks as part of the successful tasks.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/92965e82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/92965e82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/92965e82

Branch: refs/heads/master
Commit: 92965e8275c6f2ec2282ad46c09950bab10c1cb2
Parents: 4c09050
Author: Bolke de Bruin 
Authored: Mon Mar 27 20:12:29 2017 -0700
Committer: Bolke de Bruin 
Committed: Tue Mar 28 17:42:48 2017 -0700

--
 airflow/ti_deps/deps/trigger_rule_dep.py  |  6 +-
 tests/dags/test_dagrun_short_circuit_false.py | 38 --
 tests/models.py   | 83 +++---
 3 files changed, 46 insertions(+), 81 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/airflow/ti_deps/deps/trigger_rule_dep.py
--
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow/ti_deps/deps/trigger_rule_dep.py
index 3a77b00..cf06c0b 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -135,7 +135,7 @@ class TriggerRuleDep(BaseTIDep):
 if tr == TR.ALL_SUCCESS:
 if upstream_failed or failed:
 ti.set_state(State.UPSTREAM_FAILED, session)
-elif skipped == upstream:
+elif skipped:
 ti.set_state(State.SKIPPED, session)
 elif tr == TR.ALL_FAILED:
 if successes or skipped:
@@ -148,7 +148,7 @@ class TriggerRuleDep(BaseTIDep):
 ti.set_state(State.SKIPPED, session)
 
 if tr == TR.ONE_SUCCESS:
-if successes <= 0 and skipped <= 0:
+if successes <= 0:
 yield self._failing_status(
 reason="Task's trigger rule '{0}' requires one upstream "
 "task success, but none were found. "
@@ -162,7 +162,7 @@ class TriggerRuleDep(BaseTIDep):
 "upstream_tasks_state={1}, upstream_task_ids={2}"
 .format(tr, upstream_tasks_state, task.upstream_task_ids))
 elif tr == TR.ALL_SUCCESS:
-num_failures = upstream - (successes + skipped)
+num_failures = upstream - successes
 if num_failures > 0:
 yield self._failing_status(
 reason="Task's trigger rule '{0}' requires all upstream "

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/tests/dags/test_dagrun_short_circuit_false.py
--
diff --git a/tests/dags/test_dagrun_short_circuit_false.py 
b/tests/dags/test_dagrun_short_circuit_false.py
deleted file mode 100644
index 805ab67..000
--- a/tests/dags/test_dagrun_short_circuit_false.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from datetime import datetime
-
-from airflow.models import DAG
-from airflow.operators.python_operator import ShortCircuitOperator
-from airflow.operators.dummy_operator import DummyOperator
-
-
-# DAG that has its short circuit op fail and skip multiple downstream tasks
-dag = DAG(
-dag_id='test_dagrun_short_circuit_false',
-start_date=datetime(2017, 1, 1)
-)
-dag_task1 = ShortCircuitOperator(
-task_id='test_short_circuit_false',
-dag=dag,
-python_callable=lambda: False)
-dag_task2 = DummyOperator(
-task_id='test_state_skipped1',
-dag=dag)
-dag_task3 = DummyOperator(
-task_id='test_state_skipped2',
-dag=dag)
-dag_task1.set_downstream(dag_task2)
-dag_task2.set_downstream(dag_task3)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/92965e82/tests/models.py
--
diff --git 

[2/4] incubator-airflow git commit: [AIRFLOW-719] Fix race condition in ShortCircuit, Branch and LatestOnly

2017-04-04 Thread bolke
[AIRFLOW-719] Fix race condition in ShortCircuit, Branch and LatestOnly

Both the ShortCircuitOperator, Branchoperator and LatestOnlyOperator
 were arbitrarily changing the states of TaskInstances without locking
them in the database. As the scheduler checks the state of dag runs
asynchronously the dag run state could be set to failed while the
operators are updating the downstream tasks.

A better fix would to use the dag run iteself in the context of the
Operator.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb705fd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb705fd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb705fd5

Branch: refs/heads/master
Commit: eb705fd55c30cea778282140d927f51b4a649c73
Parents: 92965e8
Author: Bolke de Bruin 
Authored: Tue Mar 28 16:29:39 2017 -0700
Committer: Bolke de Bruin 
Committed: Mon Apr 3 10:38:12 2017 +0200

--
 airflow/operators/latest_only_operator.py |  30 -
 airflow/operators/python_operator.py  |  82 +---
 scripts/ci/requirements.txt   |   1 +
 tests/operators/__init__.py   |   2 +
 tests/operators/latest_only_operator.py   |   2 +-
 tests/operators/python_operator.py| 167 -
 6 files changed, 258 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/airflow/operators/latest_only_operator.py
--
diff --git a/airflow/operators/latest_only_operator.py 
b/airflow/operators/latest_only_operator.py
index 8b4e614..9d5defb 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -34,7 +34,7 @@ class LatestOnlyOperator(BaseOperator):
 def execute(self, context):
 # If the DAG Run is externally triggered, then return without
 # skipping downstream tasks
-if context['dag_run'].external_trigger:
+if context['dag_run'] and context['dag_run'].external_trigger:
 logging.info("""Externally triggered DAG_Run:
  allowing execution to proceed.""")
 return
@@ -46,17 +46,39 @@ class LatestOnlyOperator(BaseOperator):
 logging.info(
 'Checking latest only with left_window: %s right_window: %s '
 'now: %s', left_window, right_window, now)
+
 if not left_window < now <= right_window:
 logging.info('Not latest execution, skipping downstream.')
 session = settings.Session()
-for task in context['task'].downstream_list:
-ti = TaskInstance(
-task, execution_date=context['ti'].execution_date)
+
+TI = TaskInstance
+tis = session.query(TI).filter(
+TI.execution_date == context['ti'].execution_date,
+TI.task_id.in_(context['task'].downstream_task_ids)
+).with_for_update().all()
+
+for ti in tis:
 logging.info('Skipping task: %s', ti.task_id)
 ti.state = State.SKIPPED
 ti.start_date = now
 ti.end_date = now
 session.merge(ti)
+
+# this is defensive against dag runs that are not complete
+for task in context['task'].downstream_list:
+if task.task_id in tis:
+continue
+
+logging.warning("Task {} was not part of a dag run. "
+"This should not happen."
+.format(task))
+now = datetime.datetime.now()
+ti = TaskInstance(task, 
execution_date=context['ti'].execution_date)
+ti.state = State.SKIPPED
+ti.start_date = now
+ti.end_date = now
+session.merge(ti)
+
 session.commit()
 session.close()
 logging.info('Done.')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb705fd5/airflow/operators/python_operator.py
--
diff --git a/airflow/operators/python_operator.py 
b/airflow/operators/python_operator.py
index a17e6fa..cf240f2 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -109,14 +109,36 @@ class BranchPythonOperator(PythonOperator):
 logging.info("Following branch " + branch)
 logging.info("Marking other directly downstream tasks as skipped")
 session = settings.Session()
+
+TI = TaskInstance
+tis = session.query(TI).filter(
+TI.execution_date == 

[jira] [Assigned] (AIRFLOW-1065) Add functionality for Azure Blob Storage

2017-04-04 Thread Henk Griffioen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henk Griffioen reassigned AIRFLOW-1065:
---

Assignee: Henk Griffioen

> Add functionality for Azure Blob Storage
> 
>
> Key: AIRFLOW-1065
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1065
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Henk Griffioen
>Assignee: Henk Griffioen
>
> Currently Airflow has sensors and operators for S3 and GCE but it does not 
> support Azure Blob Storage.
> A hook would interface with Azure Blob storage via the Python library 
> azure-storage over the wasb protocol. Sensors use the hook to detect if a 
> blob has landed on a container and operators use it to move files to the blob 
> storage.
> The design for the hook airflow.contrib.hooks.WasbHook would mimic 
> airflow.operators.S3_hook.S3Hook.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1065) Add functionality for Azure Blob Storage

2017-04-04 Thread Henk Griffioen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Henk Griffioen updated AIRFLOW-1065:

Description: 
Currently Airflow has sensors and operators for S3 and GCE but it does not 
support Azure Blob Storage.

A hook would interface with Azure Blob storage via the Python library 
azure-storage over the wasb protocol. Sensors use the hook to detect if a blob 
has landed on a container and operators use it to move files to the blob 
storage.

The design for the hook airflow.contrib.hooks.WasbHook would mimic 
airflow.operators.S3_hook.S3Hook.

  was:
Currently Airflow has sensors and operators for S3 and GCE but it does not 
support Azure Blob Storage.

A hook would interface with Azure Blob storage via the Python library 
azure-storage over the wasbs protocol. Sensors use the hook to detect if a blob 
has landed on a container and operators use it to move files to the blob 
storage.

The design for the hook airflow.contrib.hooks.WasbsHook would mimic 
airflow.operators.S3_hook.S3Hook.


> Add functionality for Azure Blob Storage
> 
>
> Key: AIRFLOW-1065
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1065
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Henk Griffioen
>
> Currently Airflow has sensors and operators for S3 and GCE but it does not 
> support Azure Blob Storage.
> A hook would interface with Azure Blob storage via the Python library 
> azure-storage over the wasb protocol. Sensors use the hook to detect if a 
> blob has landed on a container and operators use it to move files to the blob 
> storage.
> The design for the hook airflow.contrib.hooks.WasbHook would mimic 
> airflow.operators.S3_hook.S3Hook.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1065) Add functionality for Azure Blob Storage

2017-04-04 Thread Henk Griffioen (JIRA)
Henk Griffioen created AIRFLOW-1065:
---

 Summary: Add functionality for Azure Blob Storage
 Key: AIRFLOW-1065
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1065
 Project: Apache Airflow
  Issue Type: New Feature
Reporter: Henk Griffioen


Currently Airflow has sensors and operators for S3 and GCE but it does not 
support Azure Blob Storage.

A hook would interface with Azure Blob storage via the Python library 
azure-storage over the wasbs protocol. Sensors use the hook to detect if a blob 
has landed on a container and operators use it to move files to the blob 
storage.

The design for the hook airflow.contrib.hooks.WasbsHook would mimic 
airflow.operators.S3_hook.S3Hook.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (AIRFLOW-1030) HttpHook error when creating HttpSensor

2017-04-04 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin resolved AIRFLOW-1030.
-
   Resolution: Fixed
Fix Version/s: 1.8.1

Issue resolved by pull request #2180
[https://github.com/apache/incubator-airflow/pull/2180]

> HttpHook error when creating HttpSensor
> ---
>
> Key: AIRFLOW-1030
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1030
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: Airflow 1.8
> Environment: python3
>Reporter: Paulius Dambrauskas
>Assignee: Paulius Dambrauskas
> Fix For: 1.8.1
>
>
> Task:
> {code}
> sensor = HttpSensor(
> task_id='http_sensor_check',
> http_conn_id='http_default',
> endpoint='',
> params={},
> poke_interval=5,
> dag=dag
> )
> {code}
> Exception
> {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/models.py",
>  line 268, in process_file
> m = imp.load_source(mod_name, filepath)
>   File "/usr/lib/python3.5/imp.py", line 172, in load_source
> module = _load(spec)
>   File "", line 693, in _load
>   File "", line 673, in _load_unlocked
>   File "", line 665, in exec_module
>   File "", line 222, in _call_with_frames_removed
>   File "/home/paulius/airflow/dags/cpg_4.py", line 43, in 
> dag=dag)
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/utils/decorators.py",
>  line 86, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/operators/sensors.py",
>  line 663, in __init__
> self.hook = hooks.http_hook.HttpHook(method='GET', 
> http_conn_id=http_conn_id)
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/utils/helpers.py",
>  line 436, in __getattr__
> raise AttributeError
> AttributeError
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1) Migrate GitHub code to Apache git

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954650#comment-15954650
 ] 

ASF subversion and git services commented on AIRFLOW-1:
---

Commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d in incubator-airflow's branch 
refs/heads/master from pdambrauskas
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=f2dae7d ]

[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

Closes #2180 from
pdambrauskas/fix/http_hook_import


> Migrate GitHub code to Apache git
> -
>
> Key: AIRFLOW-1
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: project-management
>Reporter: Maxime Beauchemin
>Assignee: Maxime Beauchemin
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1) Migrate GitHub code to Apache git

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954652#comment-15954652
 ] 

ASF subversion and git services commented on AIRFLOW-1:
---

Commit 4db53f39a972cae691dc49687a407dda0ff49aaf in incubator-airflow's branch 
refs/heads/v1-8-test from pdambrauskas
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4db53f3 ]

[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

Closes #2180 from
pdambrauskas/fix/http_hook_import

(cherry picked from commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d)
Signed-off-by: Bolke de Bruin 


> Migrate GitHub code to Apache git
> -
>
> Key: AIRFLOW-1
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: project-management
>Reporter: Maxime Beauchemin
>Assignee: Maxime Beauchemin
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1030) HttpHook error when creating HttpSensor

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954651#comment-15954651
 ] 

ASF subversion and git services commented on AIRFLOW-1030:
--

Commit 4db53f39a972cae691dc49687a407dda0ff49aaf in incubator-airflow's branch 
refs/heads/v1-8-test from pdambrauskas
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4db53f3 ]

[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

Closes #2180 from
pdambrauskas/fix/http_hook_import

(cherry picked from commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d)
Signed-off-by: Bolke de Bruin 


> HttpHook error when creating HttpSensor
> ---
>
> Key: AIRFLOW-1030
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1030
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: Airflow 1.8
> Environment: python3
>Reporter: Paulius Dambrauskas
>Assignee: Paulius Dambrauskas
>
> Task:
> {code}
> sensor = HttpSensor(
> task_id='http_sensor_check',
> http_conn_id='http_default',
> endpoint='',
> params={},
> poke_interval=5,
> dag=dag
> )
> {code}
> Exception
> {code}
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/models.py",
>  line 268, in process_file
> m = imp.load_source(mod_name, filepath)
>   File "/usr/lib/python3.5/imp.py", line 172, in load_source
> module = _load(spec)
>   File "", line 693, in _load
>   File "", line 673, in _load_unlocked
>   File "", line 665, in exec_module
>   File "", line 222, in _call_with_frames_removed
>   File "/home/paulius/airflow/dags/cpg_4.py", line 43, in 
> dag=dag)
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/utils/decorators.py",
>  line 86, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/operators/sensors.py",
>  line 663, in __init__
> self.hook = hooks.http_hook.HttpHook(method='GET', 
> http_conn_id=http_conn_id)
>   File 
> "/usr/local/lib/python3.5/dist-packages/airflow-1.9.0.dev0+apache.incubating-py3.5.egg/airflow/utils/helpers.py",
>  line 436, in __getattr__
> raise AttributeError
> AttributeError
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (AIRFLOW-1051) Add a test for resetdb to CliTests

2017-04-04 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin resolved AIRFLOW-1051.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Issue resolved by pull request #2198
[https://github.com/apache/incubator-airflow/pull/2198]

> Add a test for resetdb to CliTests
> --
>
> Key: AIRFLOW-1051
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1051
> Project: Apache Airflow
>  Issue Type: Test
>  Components: tests
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Minor
> Fix For: 1.9.0
>
>
> CliTests lacks a test for resetdb command for now. It should be added.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1051) Add a test for resetdb to CliTests

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954642#comment-15954642
 ] 

ASF subversion and git services commented on AIRFLOW-1051:
--

Commit 15aee05dd7104716b22ea7b01b220f9eaea3a72a in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=15aee05 ]

[AIRFLOW-1051] Add a test for resetdb to CliTests

CliTests lacks a test for resetdb command for now.
It should be added.

Closes #2198 from sekikn/AIRFLOW-1051


> Add a test for resetdb to CliTests
> --
>
> Key: AIRFLOW-1051
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1051
> Project: Apache Airflow
>  Issue Type: Test
>  Components: tests
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Minor
>
> CliTests lacks a test for resetdb command for now. It should be added.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1051] Add a test for resetdb to CliTests

2017-04-04 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master a9b20a04b -> 15aee05dd


[AIRFLOW-1051] Add a test for resetdb to CliTests

CliTests lacks a test for resetdb command for now.
It should be added.

Closes #2198 from sekikn/AIRFLOW-1051


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15aee05d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15aee05d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15aee05d

Branch: refs/heads/master
Commit: 15aee05dd7104716b22ea7b01b220f9eaea3a72a
Parents: a9b20a0
Author: Kengo Seki 
Authored: Tue Apr 4 08:37:08 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:37:08 2017 +0200

--
 tests/core.py | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15aee05d/tests/core.py
--
diff --git a/tests/core.py b/tests/core.py
index 7da08e1..8b3d1b8 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1089,6 +1089,9 @@ class CliTests(unittest.TestCase):
 def test_cli_initdb(self):
 cli.initdb(self.parser.parse_args(['initdb']))
 
+def test_cli_resetdb(self):
+cli.resetdb(self.parser.parse_args(['resetdb', '--yes']))
+
 def test_cli_connections_list(self):
 with mock.patch('sys.stdout',
 new_callable=six.StringIO) as mock_stdout:



[jira] [Commented] (AIRFLOW-1051) Add a test for resetdb to CliTests

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954643#comment-15954643
 ] 

ASF subversion and git services commented on AIRFLOW-1051:
--

Commit 15aee05dd7104716b22ea7b01b220f9eaea3a72a in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=15aee05 ]

[AIRFLOW-1051] Add a test for resetdb to CliTests

CliTests lacks a test for resetdb command for now.
It should be added.

Closes #2198 from sekikn/AIRFLOW-1051


> Add a test for resetdb to CliTests
> --
>
> Key: AIRFLOW-1051
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1051
> Project: Apache Airflow
>  Issue Type: Test
>  Components: tests
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Minor
>
> CliTests lacks a test for resetdb command for now. It should be added.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (AIRFLOW-1004) `airflow webserver -D` runs in foreground

2017-04-04 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin resolved AIRFLOW-1004.
-
Resolution: Fixed

> `airflow webserver -D` runs in foreground
> -
>
> Key: AIRFLOW-1004
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1004
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Reporter: Ruslan Dautkhanov
>Assignee: Kengo Seki
>  Labels: background, restart, rolling, webserver
> Fix For: 1.8.1
>
>
> airflow webserver doesn't want to daemonize
> {noformat}
> $ airflow webserver --daemon
> [2017-03-17 00:06:37,553] {__init__.py:57} INFO - Using executor LocalExecutor
> .. skip ..
> Running the Gunicorn Server with:
> Workers: 4 sync
> Host: 0.0.0.0:18111
> Timeout: 120
> Logfiles: - -
> =
> [2017-03-17 00:06:39,744] {__init__.py:57} INFO - Using executor LocalExecutor
> {noformat}
> webserver keeps running in foreground. 
> Sent email regarding this issue to dev list and according to [~bolke],
> "This is a (known) bug, since the introduction of the rolling restarts"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-276) List of dags does not refresh in UI for a while

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954636#comment-15954636
 ] 

ASF subversion and git services commented on AIRFLOW-276:
-

Commit a9b20a04b052e9479dbb79fd46124293085610e9 in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a9b20a0 ]

[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

AIRFLOW-276 introduced a monitor process for
gunicorn
to find new files in the dag folder, but it also
changed
`airflow webserver -D`'s behavior to run in
foreground.
This PR fixes that by running the monitor as a
daemon process.

Closes #2208 from sekikn/AIRFLOW-1004


> List of dags does not refresh in UI for a while
> ---
>
> Key: AIRFLOW-276
> URL: https://issues.apache.org/jira/browse/AIRFLOW-276
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Li Xuanji
>Assignee: Li Xuanji
>Priority: Minor
>
> After creating a new dag (eg by adding a file to `~/airflow/dags`), the web 
> UI does not show the new for a while. It only shows it when either
> 1. gunicorn decides to restart the worker process, or
> 2. a scheduler picks up the new dag, adds it to the airflow db, and the web 
> UI notices it in the db



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1004) `airflow webserver -D` runs in foreground

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954637#comment-15954637
 ] 

ASF subversion and git services commented on AIRFLOW-1004:
--

Commit a9b20a04b052e9479dbb79fd46124293085610e9 in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a9b20a0 ]

[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

AIRFLOW-276 introduced a monitor process for
gunicorn
to find new files in the dag folder, but it also
changed
`airflow webserver -D`'s behavior to run in
foreground.
This PR fixes that by running the monitor as a
daemon process.

Closes #2208 from sekikn/AIRFLOW-1004


> `airflow webserver -D` runs in foreground
> -
>
> Key: AIRFLOW-1004
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1004
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Reporter: Ruslan Dautkhanov
>Assignee: Kengo Seki
>  Labels: background, restart, rolling, webserver
> Fix For: 1.8.1
>
>
> airflow webserver doesn't want to daemonize
> {noformat}
> $ airflow webserver --daemon
> [2017-03-17 00:06:37,553] {__init__.py:57} INFO - Using executor LocalExecutor
> .. skip ..
> Running the Gunicorn Server with:
> Workers: 4 sync
> Host: 0.0.0.0:18111
> Timeout: 120
> Logfiles: - -
> =
> [2017-03-17 00:06:39,744] {__init__.py:57} INFO - Using executor LocalExecutor
> {noformat}
> webserver keeps running in foreground. 
> Sent email regarding this issue to dev list and according to [~bolke],
> "This is a (known) bug, since the introduction of the rolling restarts"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-276) List of dags does not refresh in UI for a while

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954635#comment-15954635
 ] 

ASF subversion and git services commented on AIRFLOW-276:
-

Commit a9b20a04b052e9479dbb79fd46124293085610e9 in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a9b20a0 ]

[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

AIRFLOW-276 introduced a monitor process for
gunicorn
to find new files in the dag folder, but it also
changed
`airflow webserver -D`'s behavior to run in
foreground.
This PR fixes that by running the monitor as a
daemon process.

Closes #2208 from sekikn/AIRFLOW-1004


> List of dags does not refresh in UI for a while
> ---
>
> Key: AIRFLOW-276
> URL: https://issues.apache.org/jira/browse/AIRFLOW-276
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Li Xuanji
>Assignee: Li Xuanji
>Priority: Minor
>
> After creating a new dag (eg by adding a file to `~/airflow/dags`), the web 
> UI does not show the new for a while. It only shows it when either
> 1. gunicorn decides to restart the worker process, or
> 2. a scheduler picks up the new dag, adds it to the airflow db, and the web 
> UI notices it in the db



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1004) `airflow webserver -D` runs in foreground

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1004?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954634#comment-15954634
 ] 

ASF subversion and git services commented on AIRFLOW-1004:
--

Commit a9b20a04b052e9479dbb79fd46124293085610e9 in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a9b20a0 ]

[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

AIRFLOW-276 introduced a monitor process for
gunicorn
to find new files in the dag folder, but it also
changed
`airflow webserver -D`'s behavior to run in
foreground.
This PR fixes that by running the monitor as a
daemon process.

Closes #2208 from sekikn/AIRFLOW-1004


> `airflow webserver -D` runs in foreground
> -
>
> Key: AIRFLOW-1004
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1004
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Reporter: Ruslan Dautkhanov
>Assignee: Kengo Seki
>  Labels: background, restart, rolling, webserver
> Fix For: 1.8.1
>
>
> airflow webserver doesn't want to daemonize
> {noformat}
> $ airflow webserver --daemon
> [2017-03-17 00:06:37,553] {__init__.py:57} INFO - Using executor LocalExecutor
> .. skip ..
> Running the Gunicorn Server with:
> Workers: 4 sync
> Host: 0.0.0.0:18111
> Timeout: 120
> Logfiles: - -
> =
> [2017-03-17 00:06:39,744] {__init__.py:57} INFO - Using executor LocalExecutor
> {noformat}
> webserver keeps running in foreground. 
> Sent email regarding this issue to dev list and according to [~bolke],
> "This is a (known) bug, since the introduction of the rolling restarts"



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

2017-04-04 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master e4494f85e -> a9b20a04b


[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

AIRFLOW-276 introduced a monitor process for
gunicorn
to find new files in the dag folder, but it also
changed
`airflow webserver -D`'s behavior to run in
foreground.
This PR fixes that by running the monitor as a
daemon process.

Closes #2208 from sekikn/AIRFLOW-1004


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9b20a04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9b20a04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9b20a04

Branch: refs/heads/master
Commit: a9b20a04b052e9479dbb79fd46124293085610e9
Parents: e4494f8
Author: Kengo Seki 
Authored: Tue Apr 4 08:32:44 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:32:44 2017 +0200

--
 airflow/bin/cli.py | 64 -
 tests/core.py  | 56 +++
 2 files changed, 109 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a9b20a04/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e9c54e6..e4755c7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -753,7 +753,12 @@ def webserver(args):
 app.run(debug=True, port=args.port, host=args.hostname,
 ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else 
None)
 else:
-pid, stdout, stderr, log_file = setup_locations("webserver", 
pid=args.pid)
+pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, 
args.stdout, args.stderr, args.log_file)
+if args.daemon:
+handle = setup_logging(log_file)
+stdout = open(stdout, 'w+')
+stderr = open(stderr, 'w+')
+
 print(
 textwrap.dedent('''\
 Running the Gunicorn Server with:
@@ -771,7 +776,6 @@ def webserver(args):
 '-t', str(worker_timeout),
 '-b', args.hostname + ':' + str(args.port),
 '-n', 'airflow-webserver',
-'-p', str(pid),
 '-c', 'airflow.www.gunicorn_config'
 ]
 
@@ -782,28 +786,66 @@ def webserver(args):
 run_args += ['--error-logfile', str(args.error_logfile)]
 
 if args.daemon:
-run_args += ["-D"]
+run_args += ['-D', '-p', str(pid)]
+
 if ssl_cert:
 run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
 
 run_args += ["airflow.www.app:cached_app()"]
 
-gunicorn_master_proc = subprocess.Popen(run_args)
+gunicorn_master_proc = None
 
 def kill_proc(dummy_signum, dummy_frame):
 gunicorn_master_proc.terminate()
 gunicorn_master_proc.wait()
 sys.exit(0)
 
-signal.signal(signal.SIGINT, kill_proc)
-signal.signal(signal.SIGTERM, kill_proc)
+def monitor_gunicorn(gunicorn_master_proc):
+# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
+if conf.getint('webserver', 'worker_refresh_interval') > 0:
+restart_workers(gunicorn_master_proc, num_workers)
+else:
+while True:
+time.sleep(1)
 
-# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
-if conf.getint('webserver', 'worker_refresh_interval') > 0:
-restart_workers(gunicorn_master_proc, num_workers)
+if args.daemon:
+base, ext = os.path.splitext(pid)
+ctx = daemon.DaemonContext(
+pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
+files_preserve=[handle],
+stdout=stdout,
+stderr=stderr,
+signal_map={
+signal.SIGINT: kill_proc,
+signal.SIGTERM: kill_proc
+},
+)
+with ctx:
+subprocess.Popen(run_args)
+
+# Reading pid file directly, since Popen#pid doesn't
+# seem to return the right value with DaemonContext.
+while True:
+try:
+with open(pid) as f:
+gunicorn_master_proc_pid = int(f.read())
+break
+except IOError:
+logging.debug("Waiting for gunicorn's pid file to be 
created.")
+time.sleep(0.1)
+
+gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
+ 

[jira] [Commented] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954629#comment-15954629
 ] 

ASF subversion and git services commented on AIRFLOW-1062:
--

Commit 010b80aa8b417091705556a07d5970fe0cc4efb2 in incubator-airflow's branch 
refs/heads/v1-8-test from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=010b80a ]

[AIRFLOW-1062] Fix DagRun#find to return correct result

DagRun#find returns wrong result if
external_trigger=False is specified,
because adding filter is skipped on that
condition. This PR fixes it.

Closes #2210 from sekikn/AIRFLOW-1062

(cherry picked from commit e4494f85ed5593c99949b52e1e0044c2a35f097f)
Signed-off-by: Bolke de Bruin 


> DagRun#find returns wrong result if external_trigger=False is specified
> ---
>
> Key: AIRFLOW-1062
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1062
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Reporter: Kengo Seki
>Assignee: Kengo Seki
> Fix For: 1.8.1
>
>
> Given the following record,
> {code}
> sqlite> select id, external_trigger from dag_run;
> 1|1
> sqlite>
> {code}
> the following code should return no result,
> {code}
> In [1]: from airflow import models
> In [2]: models.DagRun.find(external_trigger=False)
> {code}
> ... but an externally-triggered record is returned erroneously.
> {code}
> Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified

2017-04-04 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin resolved AIRFLOW-1062.
-
   Resolution: Fixed
Fix Version/s: 1.8.1

Issue resolved by pull request #2210
[https://github.com/apache/incubator-airflow/pull/2210]

> DagRun#find returns wrong result if external_trigger=False is specified
> ---
>
> Key: AIRFLOW-1062
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1062
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Reporter: Kengo Seki
>Assignee: Kengo Seki
> Fix For: 1.8.1
>
>
> Given the following record,
> {code}
> sqlite> select id, external_trigger from dag_run;
> 1|1
> sqlite>
> {code}
> the following code should return no result,
> {code}
> In [1]: from airflow import models
> In [2]: models.DagRun.find(external_trigger=False)
> {code}
> ... but an externally-triggered record is returned erroneously.
> {code}
> Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954628#comment-15954628
 ] 

ASF subversion and git services commented on AIRFLOW-1062:
--

Commit e4494f85ed5593c99949b52e1e0044c2a35f097f in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e4494f8 ]

[AIRFLOW-1062] Fix DagRun#find to return correct result

DagRun#find returns wrong result if
external_trigger=False is specified,
because adding filter is skipped on that
condition. This PR fixes it.

Closes #2210 from sekikn/AIRFLOW-1062


> DagRun#find returns wrong result if external_trigger=False is specified
> ---
>
> Key: AIRFLOW-1062
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1062
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Reporter: Kengo Seki
>Assignee: Kengo Seki
> Fix For: 1.8.1
>
>
> Given the following record,
> {code}
> sqlite> select id, external_trigger from dag_run;
> 1|1
> sqlite>
> {code}
> the following code should return no result,
> {code}
> In [1]: from airflow import models
> In [2]: models.DagRun.find(external_trigger=False)
> {code}
> ... but an externally-triggered record is returned erroneously.
> {code}
> Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1062] Fix DagRun#find to return correct result

2017-04-04 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 2bebeaf95 -> 010b80aa8


[AIRFLOW-1062] Fix DagRun#find to return correct result

DagRun#find returns wrong result if
external_trigger=False is specified,
because adding filter is skipped on that
condition. This PR fixes it.

Closes #2210 from sekikn/AIRFLOW-1062

(cherry picked from commit e4494f85ed5593c99949b52e1e0044c2a35f097f)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/010b80aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/010b80aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/010b80aa

Branch: refs/heads/v1-8-test
Commit: 010b80aa8b417091705556a07d5970fe0cc4efb2
Parents: 2bebeaf
Author: Kengo Seki 
Authored: Tue Apr 4 08:30:40 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:31:05 2017 +0200

--
 airflow/models.py |  2 +-
 tests/models.py   | 33 +
 2 files changed, 34 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/010b80aa/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index fdff54e..6828ab6 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3925,7 +3925,7 @@ class DagRun(Base):
 qry = qry.filter(DR.execution_date == execution_date)
 if state:
 qry = qry.filter(DR.state == state)
-if external_trigger:
+if external_trigger is not None:
 qry = qry.filter(DR.external_trigger == external_trigger)
 
 dr = qry.order_by(DR.execution_date).all()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/010b80aa/tests/models.py
--
diff --git a/tests/models.py b/tests/models.py
index c63c67e..6673c04 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -227,6 +227,39 @@ class DagRunTest(unittest.TestCase):
 'scheduled__2015-01-02T03:04:05', run_id,
 'Generated run_id did not match expectations: {0}'.format(run_id))
 
+def test_dagrun_find(self):
+session = settings.Session()
+now = datetime.datetime.now()
+
+dag_id1 = "test_dagrun_find_externally_triggered"
+dag_run = models.DagRun(
+dag_id=dag_id1,
+run_id='manual__' + now.isoformat(),
+execution_date=now,
+start_date=now,
+state=State.RUNNING,
+external_trigger=True,
+)
+session.add(dag_run)
+
+dag_id2 = "test_dagrun_find_not_externally_triggered"
+dag_run = models.DagRun(
+dag_id=dag_id2,
+run_id='manual__' + now.isoformat(),
+execution_date=now,
+start_date=now,
+state=State.RUNNING,
+external_trigger=False,
+)
+session.add(dag_run)
+
+session.commit()
+
+self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, 
external_trigger=True)))
+self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, 
external_trigger=False)))
+self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, 
external_trigger=True)))
+self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, 
external_trigger=False)))
+
 def test_dagrun_running_when_upstream_skipped(self):
 """
 Tests that a DAG run is not failed when an upstream task is skipped



[jira] [Commented] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954630#comment-15954630
 ] 

ASF subversion and git services commented on AIRFLOW-1062:
--

Commit 010b80aa8b417091705556a07d5970fe0cc4efb2 in incubator-airflow's branch 
refs/heads/v1-8-test from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=010b80a ]

[AIRFLOW-1062] Fix DagRun#find to return correct result

DagRun#find returns wrong result if
external_trigger=False is specified,
because adding filter is skipped on that
condition. This PR fixes it.

Closes #2210 from sekikn/AIRFLOW-1062

(cherry picked from commit e4494f85ed5593c99949b52e1e0044c2a35f097f)
Signed-off-by: Bolke de Bruin 


> DagRun#find returns wrong result if external_trigger=False is specified
> ---
>
> Key: AIRFLOW-1062
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1062
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Reporter: Kengo Seki
>Assignee: Kengo Seki
> Fix For: 1.8.1
>
>
> Given the following record,
> {code}
> sqlite> select id, external_trigger from dag_run;
> 1|1
> sqlite>
> {code}
> the following code should return no result,
> {code}
> In [1]: from airflow import models
> In [2]: models.DagRun.find(external_trigger=False)
> {code}
> ... but an externally-triggered record is returned erroneously.
> {code}
> Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1062) DagRun#find returns wrong result if external_trigger=False is specified

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954627#comment-15954627
 ] 

ASF subversion and git services commented on AIRFLOW-1062:
--

Commit e4494f85ed5593c99949b52e1e0044c2a35f097f in incubator-airflow's branch 
refs/heads/master from [~sekikn]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e4494f8 ]

[AIRFLOW-1062] Fix DagRun#find to return correct result

DagRun#find returns wrong result if
external_trigger=False is specified,
because adding filter is skipped on that
condition. This PR fixes it.

Closes #2210 from sekikn/AIRFLOW-1062


> DagRun#find returns wrong result if external_trigger=False is specified
> ---
>
> Key: AIRFLOW-1062
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1062
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Reporter: Kengo Seki
>Assignee: Kengo Seki
> Fix For: 1.8.1
>
>
> Given the following record,
> {code}
> sqlite> select id, external_trigger from dag_run;
> 1|1
> sqlite>
> {code}
> the following code should return no result,
> {code}
> In [1]: from airflow import models
> In [2]: models.DagRun.find(external_trigger=False)
> {code}
> ... but an externally-triggered record is returned erroneously.
> {code}
> Out[2]: [ manual__2017-04-03T01:56:15, externally triggered: True>]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1062] Fix DagRun#find to return correct result

2017-04-04 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 56501e606 -> e4494f85e


[AIRFLOW-1062] Fix DagRun#find to return correct result

DagRun#find returns wrong result if
external_trigger=False is specified,
because adding filter is skipped on that
condition. This PR fixes it.

Closes #2210 from sekikn/AIRFLOW-1062


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e4494f85
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e4494f85
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e4494f85

Branch: refs/heads/master
Commit: e4494f85ed5593c99949b52e1e0044c2a35f097f
Parents: 56501e6
Author: Kengo Seki 
Authored: Tue Apr 4 08:30:40 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:30:40 2017 +0200

--
 airflow/models.py |  2 +-
 tests/models.py   | 33 +
 2 files changed, 34 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4494f85/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index 5835578..7171c05 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3969,7 +3969,7 @@ class DagRun(Base):
 qry = qry.filter(DR.execution_date == execution_date)
 if state:
 qry = qry.filter(DR.state == state)
-if external_trigger:
+if external_trigger is not None:
 qry = qry.filter(DR.external_trigger == external_trigger)
 
 dr = qry.order_by(DR.execution_date).all()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e4494f85/tests/models.py
--
diff --git a/tests/models.py b/tests/models.py
index dcba354..43fccca 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -227,6 +227,39 @@ class DagRunTest(unittest.TestCase):
 'scheduled__2015-01-02T03:04:05', run_id,
 'Generated run_id did not match expectations: {0}'.format(run_id))
 
+def test_dagrun_find(self):
+session = settings.Session()
+now = datetime.datetime.now()
+
+dag_id1 = "test_dagrun_find_externally_triggered"
+dag_run = models.DagRun(
+dag_id=dag_id1,
+run_id='manual__' + now.isoformat(),
+execution_date=now,
+start_date=now,
+state=State.RUNNING,
+external_trigger=True,
+)
+session.add(dag_run)
+
+dag_id2 = "test_dagrun_find_not_externally_triggered"
+dag_run = models.DagRun(
+dag_id=dag_id2,
+run_id='manual__' + now.isoformat(),
+execution_date=now,
+start_date=now,
+state=State.RUNNING,
+external_trigger=False,
+)
+session.add(dag_run)
+
+session.commit()
+
+self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, 
external_trigger=True)))
+self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, 
external_trigger=False)))
+self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, 
external_trigger=True)))
+self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, 
external_trigger=False)))
+
 def test_dagrun_running_when_upstream_skipped(self):
 """
 Tests that a DAG run is not failed when an upstream task is skipped



[jira] [Commented] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954623#comment-15954623
 ] 

ASF subversion and git services commented on AIRFLOW-1011:
--

Commit 2bebeaf9554d35710de6eb1b4006157e105ac79b in incubator-airflow's branch 
refs/heads/v1-8-test from [~jschmid]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=2bebeaf ]

[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags

(cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc)
Signed-off-by: Bolke de Bruin 


> Fix bug in BackfillJob._execute() for SubDAGs
> -
>
> Key: AIRFLOW-1011
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1011
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: backfill, subdag
>Affects Versions: 1.8.0
>Reporter: Joe Schmid
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, 
> 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, 
> 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, 
> test_subdag.py
>
>
> The attached test SubDAG is not executed when the parent DAG is triggered 
> manually. Attached is a simple test DAG that exhibits the issue along with 
> screenshots showing the UI differences between v1.8 and v1.7.1.3.
> Note that if the DAG is run via backfill from command line (e.g. "airflow 
> backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up 
> successfully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954624#comment-15954624
 ] 

ASF subversion and git services commented on AIRFLOW-1011:
--

Commit 2bebeaf9554d35710de6eb1b4006157e105ac79b in incubator-airflow's branch 
refs/heads/v1-8-test from [~jschmid]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=2bebeaf ]

[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags

(cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc)
Signed-off-by: Bolke de Bruin 


> Fix bug in BackfillJob._execute() for SubDAGs
> -
>
> Key: AIRFLOW-1011
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1011
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: backfill, subdag
>Affects Versions: 1.8.0
>Reporter: Joe Schmid
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, 
> 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, 
> 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, 
> test_subdag.py
>
>
> The attached test SubDAG is not executed when the parent DAG is triggered 
> manually. Attached is a simple test DAG that exhibits the issue along with 
> screenshots showing the UI differences between v1.8 and v1.7.1.3.
> Note that if the DAG is run via backfill from command line (e.g. "airflow 
> backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up 
> successfully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs

2017-04-04 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin resolved AIRFLOW-1011.
-
Resolution: Fixed

Issue resolved by pull request #2179
[https://github.com/apache/incubator-airflow/pull/2179]

> Fix bug in BackfillJob._execute() for SubDAGs
> -
>
> Key: AIRFLOW-1011
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1011
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: backfill, subdag
>Affects Versions: 1.8.0
>Reporter: Joe Schmid
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, 
> 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, 
> 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, 
> test_subdag.py
>
>
> The attached test SubDAG is not executed when the parent DAG is triggered 
> manually. Attached is a simple test DAG that exhibits the issue along with 
> screenshots showing the UI differences between v1.8 and v1.7.1.3.
> Note that if the DAG is run via backfill from command line (e.g. "airflow 
> backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up 
> successfully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954620#comment-15954620
 ] 

ASF subversion and git services commented on AIRFLOW-1011:
--

Commit 56501e6062df9456f7ac4efe94e21940734dd5bc in incubator-airflow's branch 
refs/heads/master from [~jschmid]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=56501e6 ]

[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags


> Fix bug in BackfillJob._execute() for SubDAGs
> -
>
> Key: AIRFLOW-1011
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1011
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: backfill, subdag
>Affects Versions: 1.8.0
>Reporter: Joe Schmid
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, 
> 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, 
> 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, 
> test_subdag.py
>
>
> The attached test SubDAG is not executed when the parent DAG is triggered 
> manually. Attached is a simple test DAG that exhibits the issue along with 
> screenshots showing the UI differences between v1.8 and v1.7.1.3.
> Note that if the DAG is run via backfill from command line (e.g. "airflow 
> backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up 
> successfully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1011) Fix bug in BackfillJob._execute() for SubDAGs

2017-04-04 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954621#comment-15954621
 ] 

ASF subversion and git services commented on AIRFLOW-1011:
--

Commit 56501e6062df9456f7ac4efe94e21940734dd5bc in incubator-airflow's branch 
refs/heads/master from [~jschmid]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=56501e6 ]

[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags


> Fix bug in BackfillJob._execute() for SubDAGs
> -
>
> Key: AIRFLOW-1011
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1011
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: backfill, subdag
>Affects Versions: 1.8.0
>Reporter: Joe Schmid
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: 1-TopLevelDAGTaskInstancesShownCorrectly.png, 
> 2-ZoomedSubDAG-NoTaskInstances-v1.8.png, 
> 3-ZoomedSubDAG-TaskInstances-v1.7.1.3.png, subdag_task_instance_logs.txt, 
> test_subdag.py
>
>
> The attached test SubDAG is not executed when the parent DAG is triggered 
> manually. Attached is a simple test DAG that exhibits the issue along with 
> screenshots showing the UI differences between v1.8 and v1.7.1.3.
> Note that if the DAG is run via backfill from command line (e.g. "airflow 
> backfill Test_SubDAG -s 2017-03-18 -e 2017-03-18") the task instances show up 
> successfully.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

2017-04-04 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 75addb4a9 -> 56501e606


[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/56501e60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/56501e60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/56501e60

Branch: refs/heads/master
Commit: 56501e6062df9456f7ac4efe94e21940734dd5bc
Parents: 75addb4
Author: Joe Schmid 
Authored: Tue Apr 4 08:27:45 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:27:45 2017 +0200

--
 airflow/jobs.py   |  7 +--
 airflow/models.py |  1 +
 tests/jobs.py | 28 
 3 files changed, 34 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 006a180..b5c2d5d 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob):
 
 # consider max_active_runs but ignore when running subdags
 # "parent.child" as a dag_id is by convention a subdag
-if self.dag.schedule_interval and "." not in self.dag.dag_id:
+if self.dag.schedule_interval and not self.dag.is_subdag:
 active_runs = DagRun.find(
 dag_id=self.dag.dag_id,
 state=State.RUNNING,
@@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob):
 
 # create dag runs
 dr_start_date = start_date or min([t.start_date for t in 
self.dag.tasks])
-next_run_date = self.dag.normalize_schedule(dr_start_date)
 end_date = end_date or datetime.now()
+# next run date for a subdag isn't relevant (schedule_interval for 
subdags
+# is ignored) so we use the dag run's start date in the case of a 
subdag
+next_run_date = (self.dag.normalize_schedule(dr_start_date)
+ if not self.dag.is_subdag else dr_start_date)
 
 active_dag_runs = []
 while next_run_date and next_run_date <= end_date:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index 9d560fb..5835578 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2716,6 +2716,7 @@ class DAG(BaseDag, LoggingMixin):
 self.default_view = default_view
 self.orientation = orientation
 self.catchup = catchup
+self.is_subdag = False  # DagBag.bag_dag() will set this to True if 
appropriate
 
 self.partial = False
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/56501e60/tests/jobs.py
--
diff --git a/tests/jobs.py b/tests/jobs.py
index c1d6790..3eb407b 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase):
 else:
 self.assertEqual(State.NONE, ti.state)
 
+def test_backfill_execute_subdag(self):
+dag = self.dagbag.get_dag('example_subdag_operator')
+subdag_op_task = dag.get_task('section-1')
+
+subdag = subdag_op_task.subdag
+subdag.schedule_interval = '@daily'
+
+start_date = datetime.datetime.now()
+executor = TestExecutor(do_update=True)
+job = BackfillJob(dag=subdag,
+  start_date=start_date,
+  end_date=start_date,
+  executor=executor,
+  donot_pickle=True)
+job.run()
+
+history = executor.history
+subdag_history = history[0]
+
+# check that all 5 task instances of the subdag 'section-1' were 
executed
+self.assertEqual(5, len(subdag_history))
+for sdh in subdag_history:
+ti = sdh[3]
+self.assertIn('section-1-task-', ti.task_id)
+
+subdag.clear()
+ 

incubator-airflow git commit: [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

2017-04-04 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 68b1c982e -> 2bebeaf95


[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags

(cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2bebeaf9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2bebeaf9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2bebeaf9

Branch: refs/heads/v1-8-test
Commit: 2bebeaf9554d35710de6eb1b4006157e105ac79b
Parents: 68b1c98
Author: Joe Schmid 
Authored: Tue Apr 4 08:27:45 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:28:07 2017 +0200

--
 airflow/jobs.py   |  7 +--
 airflow/models.py |  1 +
 tests/jobs.py | 28 
 3 files changed, 34 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 222d9ba..7db9b9c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob):
 
 # consider max_active_runs but ignore when running subdags
 # "parent.child" as a dag_id is by convention a subdag
-if self.dag.schedule_interval and "." not in self.dag.dag_id:
+if self.dag.schedule_interval and not self.dag.is_subdag:
 active_runs = DagRun.find(
 dag_id=self.dag.dag_id,
 state=State.RUNNING,
@@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob):
 
 # create dag runs
 dr_start_date = start_date or min([t.start_date for t in 
self.dag.tasks])
-next_run_date = self.dag.normalize_schedule(dr_start_date)
 end_date = end_date or datetime.now()
+# next run date for a subdag isn't relevant (schedule_interval for 
subdags
+# is ignored) so we use the dag run's start date in the case of a 
subdag
+next_run_date = (self.dag.normalize_schedule(dr_start_date)
+ if not self.dag.is_subdag else dr_start_date)
 
 active_dag_runs = []
 while next_run_date and next_run_date <= end_date:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index bdda701..fdff54e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2682,6 +2682,7 @@ class DAG(BaseDag, LoggingMixin):
 self.sla_miss_callback = sla_miss_callback
 self.orientation = orientation
 self.catchup = catchup
+self.is_subdag = False  # DagBag.bag_dag() will set this to True if 
appropriate
 
 self.partial = False
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/tests/jobs.py
--
diff --git a/tests/jobs.py b/tests/jobs.py
index aee0e9c..f9ede68 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase):
 else:
 self.assertEqual(State.NONE, ti.state)
 
+def test_backfill_execute_subdag(self):
+dag = self.dagbag.get_dag('example_subdag_operator')
+subdag_op_task = dag.get_task('section-1')
+
+subdag = subdag_op_task.subdag
+subdag.schedule_interval = '@daily'
+
+start_date = datetime.datetime.now()
+executor = TestExecutor(do_update=True)
+job = BackfillJob(dag=subdag,
+  start_date=start_date,
+  end_date=start_date,
+  executor=executor,
+  donot_pickle=True)
+job.run()
+
+history = executor.history
+subdag_history = history[0]
+
+# check that all 5 task instances of the subdag 'section-1' were 
executed
+self.assertEqual(5, len(subdag_history))
+for