[jira] [Commented] (AIRFLOW-1060) dag lost tracking the status of tasks and stuck in running state

2017-04-06 Thread Adam Whitlock (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960002#comment-15960002
 ] 

Adam Whitlock commented on AIRFLOW-1060:


[~jeffliujing] - I believe you might want a different Adam for this question. I 
could help out if it were Azkaban, but not Airflow. Sorry! 

> dag lost tracking the status of tasks and stuck in running state
> 
>
> Key: AIRFLOW-1060
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1060
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Jeff Liu
>
> I'm running an airflow 1.7.1 in one of my environment and constantly run into 
> an issue with the main dag status stuck in "running" state, while the tasks 
> all have completed successfully.
> To resolve the issue, I had to "delete" the dag entry in airflow UI, and 
> re-run the job manually so the dag job can recognize the tasks are all 
> completed and set it self to successful after re-run. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1060) dag lost tracking the status of tasks and stuck in running state

2017-04-06 Thread Jeff Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959984#comment-15959984
 ] 

Jeff Liu commented on AIRFLOW-1060:
---

[~alloydwhitlock] Adam, any idea on this issue?

> dag lost tracking the status of tasks and stuck in running state
> 
>
> Key: AIRFLOW-1060
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1060
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Jeff Liu
>
> I'm running an airflow 1.7.1 in one of my environment and constantly run into 
> an issue with the main dag status stuck in "running" state, while the tasks 
> all have completed successfully.
> To resolve the issue, I had to "delete" the dag entry in airflow UI, and 
> re-run the job manually so the dag job can recognize the tasks are all 
> completed and set it self to successful after re-run. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1082) Graph legend pushes graph to flat lines if many operators are present

2017-04-06 Thread Michael Erdely (JIRA)
Michael Erdely created AIRFLOW-1082:
---

 Summary: Graph legend pushes graph to flat lines if many operators 
are present
 Key: AIRFLOW-1082
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1082
 Project: Apache Airflow
  Issue Type: Bug
Affects Versions: 1.8.0
Reporter: Michael Erdely
 Attachments: landing-times.png

If many operators are present, the legend representing these operators pushes 
the graph to be flat. This occurs on the Task Duration, Task Tries and Land 
Times tabs.

This is new to 1.8.0

See attachment for reference.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1081) Task duration page is slow

2017-04-06 Thread Alex Guziel (JIRA)
Alex Guziel created AIRFLOW-1081:


 Summary: Task duration page is slow
 Key: AIRFLOW-1081
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1081
 Project: Apache Airflow
  Issue Type: Bug
Reporter: Alex Guziel
Assignee: Alex Guziel


It makes a number of queries proportional to the data size, instead of just 2.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (AIRFLOW-1028) Databricks Operator for Airflow

2017-04-06 Thread Arthur Wiedmer (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arthur Wiedmer resolved AIRFLOW-1028.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Issue resolved by pull request #2202
[https://github.com/apache/incubator-airflow/pull/2202]

> Databricks Operator for Airflow
> ---
>
> Key: AIRFLOW-1028
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1028
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Andrew Chen
>Assignee: Andrew Chen
> Fix For: 1.9.0
>
>
> It would be nice to have a Databricks Operator/Hook in Airflow so users of 
> Databricks can more easily integrate with Airflow.
> The operator would submit a spark job to our new /jobs/runs/submit endpoint. 
> This endpoint is similar to 
> https://docs.databricks.com/api/latest/jobs.html#jobscreatejob but does not 
> include the email_notifications, max_retries, min_retry_interval_millis, 
> retry_on_timeout, schedule, max_concurrent_runs fields. (The submit docs are 
> not out because it's still a private endpoint.)
> Our proposed design for the operator then is to match this REST API endpoint. 
> Each argument to the parameter is named to be one of the fields of the REST 
> API request and the value of the argument will match the type expected by the 
> REST API. We will also merge extra keys from kwargs which should not be 
> passed to the BaseOperator into our API call in order to be flexible to 
> updates.
> In the case that this interface is not very user friendly, we can later add 
> more operators which extend this operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1028) Databricks Operator for Airflow

2017-04-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959101#comment-15959101
 ] 

ASF subversion and git services commented on AIRFLOW-1028:
--

Commit 53ca5084561fd5c13996609f2eda6baf717249b5 in incubator-airflow's branch 
refs/heads/master from [~andrewmchen]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=53ca508 ]

[AIRFLOW-1028] Databricks Operator for Airflow

Add DatabricksSubmitRun Operator

In this PR, we contribute a DatabricksSubmitRun operator and a
Databricks hook. This operator enables easy integration of Airflow
with Databricks. In addition to the operator, we have created a
databricks_default connection, an example_dag using this
DatabricksSubmitRunOperator, and matching documentation.

Closes #2202 from andrewmchen/databricks-operator-
squashed


> Databricks Operator for Airflow
> ---
>
> Key: AIRFLOW-1028
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1028
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Andrew Chen
>Assignee: Andrew Chen
>
> It would be nice to have a Databricks Operator/Hook in Airflow so users of 
> Databricks can more easily integrate with Airflow.
> The operator would submit a spark job to our new /jobs/runs/submit endpoint. 
> This endpoint is similar to 
> https://docs.databricks.com/api/latest/jobs.html#jobscreatejob but does not 
> include the email_notifications, max_retries, min_retry_interval_millis, 
> retry_on_timeout, schedule, max_concurrent_runs fields. (The submit docs are 
> not out because it's still a private endpoint.)
> Our proposed design for the operator then is to match this REST API endpoint. 
> Each argument to the parameter is named to be one of the fields of the REST 
> API request and the value of the argument will match the type expected by the 
> REST API. We will also merge extra keys from kwargs which should not be 
> passed to the BaseOperator into our API call in order to be flexible to 
> updates.
> In the case that this interface is not very user friendly, we can later add 
> more operators which extend this operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1028] Databricks Operator for Airflow

2017-04-06 Thread arthur
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 5a6f18f1c -> 53ca50845


[AIRFLOW-1028] Databricks Operator for Airflow

Add DatabricksSubmitRun Operator

In this PR, we contribute a DatabricksSubmitRun operator and a
Databricks hook. This operator enables easy integration of Airflow
with Databricks. In addition to the operator, we have created a
databricks_default connection, an example_dag using this
DatabricksSubmitRunOperator, and matching documentation.

Closes #2202 from andrewmchen/databricks-operator-
squashed


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/53ca5084
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/53ca5084
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/53ca5084

Branch: refs/heads/master
Commit: 53ca5084561fd5c13996609f2eda6baf717249b5
Parents: 5a6f18f
Author: Andrew Chen 
Authored: Thu Apr 6 08:30:01 2017 -0700
Committer: Arthur Wiedmer 
Committed: Thu Apr 6 08:30:33 2017 -0700

--
 .../example_dags/example_databricks_operator.py |  82 +++
 airflow/contrib/hooks/databricks_hook.py| 202 +
 .../contrib/operators/databricks_operator.py| 211 +
 airflow/exceptions.py   |   2 +-
 airflow/models.py   |   1 +
 airflow/utils/db.py |   4 +
 docs/code.rst   |   1 +
 docs/integration.rst|  13 ++
 setup.py|   2 +
 tests/contrib/hooks/databricks_hook.py  | 226 +++
 tests/contrib/operators/databricks_operator.py  | 185 +++
 11 files changed, 928 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53ca5084/airflow/contrib/example_dags/example_databricks_operator.py
--
diff --git a/airflow/contrib/example_dags/example_databricks_operator.py 
b/airflow/contrib/example_dags/example_databricks_operator.py
new file mode 100644
index 000..abf6844
--- /dev/null
+++ b/airflow/contrib/example_dags/example_databricks_operator.py
@@ -0,0 +1,82 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import airflow
+
+from airflow import DAG
+from airflow.contrib.operators.databricks_operator import 
DatabricksSubmitRunOperator
+
+# This is an example DAG which uses the DatabricksSubmitRunOperator.
+# In this example, we create two tasks which execute sequentially.
+# The first task is to run a notebook at the workspace path "/test"
+# and the second task is to run a JAR uploaded to DBFS. Both,
+# tasks use new clusters.
+#
+# Because we have set a downstream dependency on the notebook task,
+# the spark jar task will NOT run until the notebook task completes
+# successfully.
+#
+# The definition of a succesful run is if the run has a result_state of 
"SUCCESS".
+# For more information about the state of a run refer to
+# https://docs.databricks.com/api/latest/jobs.html#runstate
+
+args = {
+'owner': 'airflow',
+'email': ['airf...@example.com'],
+'depends_on_past': False,
+'start_date': airflow.utils.dates.days_ago(2)
+}
+
+dag = DAG(
+dag_id='example_databricks_operator', default_args=args,
+schedule_interval='@daily')
+
+new_cluster = {
+'spark_version': '2.1.0-db3-scala2.11',
+'node_type_id': 'r3.xlarge',
+'aws_attributes': {
+'availability': 'ON_DEMAND'
+},
+'num_workers': 8
+}
+
+notebook_task_params = {
+'new_cluster': new_cluster,
+'notebook_task': {
+'notebook_path': '/Users/airf...@example.com/PrepareData',
+},
+}
+# Example of using the JSON parameter to initialize the operator.
+notebook_task = DatabricksSubmitRunOperator(
+task_id='notebook_task',
+dag=dag,
+json=notebook_task_params)
+
+# Example of using the named parameters of DatabricksSubmitRunOperator
+# to initialize the operator.
+spark_jar_task = DatabricksSubmitRunOperator(
+task_id='spark_jar_task',
+dag=dag,
+new_cluster=new_cluster,
+spark_jar_task={
+'main_class_name': 'com.example.ProcessData'
+ 

[jira] [Updated] (AIRFLOW-1050) Retries ignored - regression

2017-04-06 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin updated AIRFLOW-1050:

Component/s: backfill

> Retries ignored - regression
> 
>
> Key: AIRFLOW-1050
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1050
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: backfill
>Affects Versions: 1.8.0
>Reporter: Ján Koščo
>Assignee: Bolke de Bruin
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: Screen Shot 2017-03-28 at 11.15.51.png, Screen Shot 
> 2017-03-28 at 11.15.59.png
>
>
> SubDag fails when first operator fails, despite the fact it's configured for 
> retries. Information in UI afterwards are also incorrect. From SubDag 
> prospective it's still {{running}} with operator marked as {{up_for_retry}}, 
> from main DAG prospective, whole run is marked as {{failed}} same as SubDag. 
> See attached screenshots. Latest not affected version is RC4 (310fb58). I 
> tested RC5, 1.8.0 with LocalExecutor and CeleryExecutor.
> Example code:
> {code}
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> args = {
> "start_date": datetime.today(),
> }
> dag = DAG(
> dag_id="main", default_args=args,
> dagrun_timeout=timedelta(minutes=60),
> schedule_interval=None,
> max_active_runs=1
> )
> sub_dag = DAG(
> dag_id="main.test",
> default_args=args,
> schedule_interval=None,
> )
> op = BashOperator(
> task_id="first",
> dag=sub_dag,
> bash_command="echo 1"
> )
> def throw_error():
> raise RuntimeError()
> op2 = PythonOperator(
> task_id="second",
> dag=sub_dag,
> python_callable=throw_error,
> retries=3,
> retry_delay=timedelta(0, 20)
> )
> op >> op2
> prepare_environment = SubDagOperator(
> task_id='test',
> subdag=sub_dag,
> default_args=args,
> dag=dag,
> )
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (AIRFLOW-1050) Retries ignored - regression

2017-04-06 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin reassigned AIRFLOW-1050:
---

Assignee: Bolke de Bruin

> Retries ignored - regression
> 
>
> Key: AIRFLOW-1050
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1050
> Project: Apache Airflow
>  Issue Type: Bug
>Affects Versions: 1.8.0
>Reporter: Ján Koščo
>Assignee: Bolke de Bruin
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: Screen Shot 2017-03-28 at 11.15.51.png, Screen Shot 
> 2017-03-28 at 11.15.59.png
>
>
> SubDag fails when first operator fails, despite the fact it's configured for 
> retries. Information in UI afterwards are also incorrect. From SubDag 
> prospective it's still {{running}} with operator marked as {{up_for_retry}}, 
> from main DAG prospective, whole run is marked as {{failed}} same as SubDag. 
> See attached screenshots. Latest not affected version is RC4 (310fb58). I 
> tested RC5, 1.8.0 with LocalExecutor and CeleryExecutor.
> Example code:
> {code}
> from datetime import datetime, timedelta
> from airflow.models import DAG
> from airflow.operators.bash_operator import BashOperator
> from airflow.operators.python_operator import PythonOperator
> from airflow.operators.subdag_operator import SubDagOperator
> args = {
> "start_date": datetime.today(),
> }
> dag = DAG(
> dag_id="main", default_args=args,
> dagrun_timeout=timedelta(minutes=60),
> schedule_interval=None,
> max_active_runs=1
> )
> sub_dag = DAG(
> dag_id="main.test",
> default_args=args,
> schedule_interval=None,
> )
> op = BashOperator(
> task_id="first",
> dag=sub_dag,
> bash_command="echo 1"
> )
> def throw_error():
> raise RuntimeError()
> op2 = PythonOperator(
> task_id="second",
> dag=sub_dag,
> python_callable=throw_error,
> retries=3,
> retry_delay=timedelta(0, 20)
> )
> op >> op2
> prepare_environment = SubDagOperator(
> task_id='test',
> subdag=sub_dag,
> default_args=args,
> dag=dag,
> )
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1075) Cleanup security docs

2017-04-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958803#comment-15958803
 ] 

ASF subversion and git services commented on AIRFLOW-1075:
--

Commit 5a6f18f1caebc195569be4397bfe8cb36fec3f1a in incubator-airflow's branch 
refs/heads/master from [~dxhuang]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5a6f18f ]

[AIRFLOW-1075] Security docs cleanup

Closes # from dhuang/AIRFLOW-1075


> Cleanup security docs
> -
>
> Key: AIRFLOW-1075
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1075
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: docs
>Reporter: Daniel Huang
>Assignee: Daniel Huang
>Priority: Trivial
> Fix For: 1.9.0
>
>
> Noticed a few minor things to fix, like "Impersonation" being under "SSL" 
> section.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Resolved] (AIRFLOW-1075) Cleanup security docs

2017-04-06 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin resolved AIRFLOW-1075.
-
   Resolution: Fixed
Fix Version/s: 1.9.0

Issue resolved by pull request #
[https://github.com/apache/incubator-airflow/pull/]

> Cleanup security docs
> -
>
> Key: AIRFLOW-1075
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1075
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: docs
>Reporter: Daniel Huang
>Assignee: Daniel Huang
>Priority: Trivial
> Fix For: 1.9.0
>
>
> Noticed a few minor things to fix, like "Impersonation" being under "SSL" 
> section.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1075) Cleanup security docs

2017-04-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958802#comment-15958802
 ] 

ASF subversion and git services commented on AIRFLOW-1075:
--

Commit 5a6f18f1caebc195569be4397bfe8cb36fec3f1a in incubator-airflow's branch 
refs/heads/master from [~dxhuang]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5a6f18f ]

[AIRFLOW-1075] Security docs cleanup

Closes # from dhuang/AIRFLOW-1075


> Cleanup security docs
> -
>
> Key: AIRFLOW-1075
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1075
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: docs
>Reporter: Daniel Huang
>Assignee: Daniel Huang
>Priority: Trivial
> Fix For: 1.9.0
>
>
> Noticed a few minor things to fix, like "Impersonation" being under "SSL" 
> section.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1075] Security docs cleanup

2017-04-06 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master fbcbd053a -> 5a6f18f1c


[AIRFLOW-1075] Security docs cleanup

Closes # from dhuang/AIRFLOW-1075


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5a6f18f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5a6f18f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5a6f18f1

Branch: refs/heads/master
Commit: 5a6f18f1caebc195569be4397bfe8cb36fec3f1a
Parents: fbcbd05
Author: Daniel Huang 
Authored: Thu Apr 6 14:12:13 2017 +0200
Committer: Bolke de Bruin 
Committed: Thu Apr 6 14:12:13 2017 +0200

--
 docs/security.rst | 62 ++
 1 file changed, 37 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a6f18f1/docs/security.rst
--
diff --git a/docs/security.rst b/docs/security.rst
index 7c06fe3..c0e2918 100644
--- a/docs/security.rst
+++ b/docs/security.rst
@@ -6,7 +6,7 @@ to the web application is to do it at the network level, or by 
using
 SSH tunnels.
 
 It is however possible to switch on authentication by either using one of the 
supplied
-backends or create your own.
+backends or creating your own.
 
 Web Authentication
 --
@@ -89,7 +89,7 @@ Roll your own
 
 Airflow uses ``flask_login`` and
 exposes a set of hooks in the ``airflow.default_login`` module. You can
-alter the content and make it part of the ``PYTHONPATH`` and configure it as a 
backend in ``airflow.cfg```.
+alter the content and make it part of the ``PYTHONPATH`` and configure it as a 
backend in ``airflow.cfg``.
 
 .. code-block:: bash
 
@@ -100,12 +100,14 @@ alter the content and make it part of the ``PYTHONPATH`` 
and configure it as a b
 Multi-tenancy
 -
 
-You can filter the list of dags in webserver by owner name, when authentication
-is turned on, by setting webserver.filter_by_owner as true in your 
``airflow.cfg``
-With this, when a user authenticates and logs into webserver, it will see only 
the dags
-which it is owner of. A super_user, will be able to see all the dags although.
-This makes the web UI a multi-tenant UI, where a user will only be able to see 
dags
-created by itself.
+You can filter the list of dags in webserver by owner name when authentication
+is turned on by setting ``webserver:filter_by_owner`` in your config. With 
this, a user will see 
+only the dags which it is owner of, unless it is a superuser.
+
+.. code-block:: bash
+
+[webserver]
+filter_by_owner = True
 
 
 Kerberos
@@ -118,17 +120,18 @@ to authenticate against kerberized services.
 Limitations
 '''
 
-Please note that at this time not all hooks have been adjusted to make use of 
this functionality yet.
+Please note that at this time, not all hooks have been adjusted to make use of 
this functionality.
 Also it does not integrate kerberos into the web interface and you will have 
to rely on network
 level security for now to make sure your service remains secure.
 
-Celery integration has not been tried and tested yet. However if you generate 
a key tab for every host
-and launch a ticket renewer next to every worker it will most likely work.
+Celery integration has not been tried and tested yet. However, if you generate 
a key tab for every
+host and launch a ticket renewer next to every worker it will most likely work.
 
 Enabling kerberos
 '
 
- Airflow
+Airflow
+^^^
 
 To enable kerberos you will need to generate a (service) key tab.
 
@@ -160,7 +163,8 @@ Launch the ticket renewer by
 # run ticket renewer
 airflow kerberos
 
- Hadoop
+Hadoop
+^^
 
 If want to use impersonation this needs to be enabled in ``core-site.xml`` of 
your hadoop config.
 
@@ -186,8 +190,8 @@ Of course if you need to tighten your security replace the 
asterisk with somethi
 Using kerberos authentication
 '
 
-The hive hook has been updated to take advantage of kerberos authentication. 
To allow your DAGs to use it simply
-update the connection details with, for example:
+The hive hook has been updated to take advantage of kerberos authentication. 
To allow your DAGs to
+use it, simply update the connection details with, for example:
 
 .. code-block:: bash
 
@@ -197,7 +201,7 @@ Adjust the principal to your settings. The _HOST part will 
be replaced by the fu
 the server.
 
 You can specify if you would like to use the dag owner as the user for the 
connection or the user specified in the login
-section of the connection. For the login user specify the following as extra:
+section of the connection. For the login user, specify the 

[jira] [Resolved] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py

2017-04-06 Thread Bolke de Bruin (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bolke de Bruin resolved AIRFLOW-1033.
-
Resolution: Fixed

Issue resolved by pull request #2220
[https://github.com/apache/incubator-airflow/pull/2220]

> TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
> 
>
> Key: AIRFLOW-1033
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1033
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun
>Affects Versions: 1.8.1
> Environment: Centos 7; 
> db: PostgreSQL 9.5
> python version: 2.7
> Installation via pip
>Reporter: Bert Desmet
>Priority: Blocker
>  Labels: bug, interval
> Fix For: 1.8.1
>
> Attachments: test_dag.py, test_dag.py.log
>
>
> Dear, 
> When starting a specific new dag we get the following error:
> [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
> pickle_dags)
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in 
> process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in 
> _process_dags
> self._process_task_instances(dag, tis_out)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in 
> _process_task_instances
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in 
> are_dependencies_met
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in 
> get_failed_dep_statuses
> dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 
> 94, in get_dep_statuses
> for dep_status in self._get_dep_statuses(ti, session, dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", 
> line 47, in _get_dep_statuses
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> TypeError: can't compare datetime.datetime to NoneType
> I have added some debug code to the file 'prev_dagrun_dep.py:
> dag = ti.task.dag
> print 'Start dates:'
> print 'previous_exection_date: 
> %s'%(dag.previous_schedule(ti.execution_date))
> print 'current start date: %s'%(ti.task.start_date)
> if dag.catchup:
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> And this is the output I get:
> Start dates:
> previous_exection_date: None
> current start date: 2017-03-19 00:00:00
> I think it is normall that the previous_exection_date is null, since it is 
> the first time this dag is being run. But why is the start_date of the dag 
> important, and not the start date of the run? 
> I have the feeling the cause is the 'schedule_interval', which is set to 
> None. 
> Please find an example and it's log file as an attachment to this mail. 
> Bert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py

2017-04-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958795#comment-15958795
 ] 

ASF subversion and git services commented on AIRFLOW-1033:
--

Commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=fbcbd05 ]

[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

DAGs that did not have a schedule (None or @once)
make the dependency
checker raise an exception as the previous
schedule will not exist.

Also activates all ti_deps tests.

Closes #2220 from bolkedebruin/AIRFLOW-1033


> TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
> 
>
> Key: AIRFLOW-1033
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1033
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun
>Affects Versions: 1.8.1
> Environment: Centos 7; 
> db: PostgreSQL 9.5
> python version: 2.7
> Installation via pip
>Reporter: Bert Desmet
>Priority: Blocker
>  Labels: bug, interval
> Fix For: 1.8.1
>
> Attachments: test_dag.py, test_dag.py.log
>
>
> Dear, 
> When starting a specific new dag we get the following error:
> [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
> pickle_dags)
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in 
> process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in 
> _process_dags
> self._process_task_instances(dag, tis_out)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in 
> _process_task_instances
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in 
> are_dependencies_met
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in 
> get_failed_dep_statuses
> dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 
> 94, in get_dep_statuses
> for dep_status in self._get_dep_statuses(ti, session, dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", 
> line 47, in _get_dep_statuses
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> TypeError: can't compare datetime.datetime to NoneType
> I have added some debug code to the file 'prev_dagrun_dep.py:
> dag = ti.task.dag
> print 'Start dates:'
> print 'previous_exection_date: 
> %s'%(dag.previous_schedule(ti.execution_date))
> print 'current start date: %s'%(ti.task.start_date)
> if dag.catchup:
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> And this is the output I get:
> Start dates:
> previous_exection_date: None
> current start date: 2017-03-19 00:00:00
> I think it is normall that the previous_exection_date is null, since it is 
> the first time this dag is being run. But why is the start_date of the dag 
> important, and not the start date of the run? 
> I have the feeling the cause is the 'schedule_interval', which is set to 
> None. 
> Please find an example and it's log file as an attachment to this mail. 
> Bert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py

2017-04-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958796#comment-15958796
 ] 

ASF subversion and git services commented on AIRFLOW-1033:
--

Commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab in incubator-airflow's branch 
refs/heads/master from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=fbcbd05 ]

[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

DAGs that did not have a schedule (None or @once)
make the dependency
checker raise an exception as the previous
schedule will not exist.

Also activates all ti_deps tests.

Closes #2220 from bolkedebruin/AIRFLOW-1033


> TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
> 
>
> Key: AIRFLOW-1033
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1033
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun
>Affects Versions: 1.8.1
> Environment: Centos 7; 
> db: PostgreSQL 9.5
> python version: 2.7
> Installation via pip
>Reporter: Bert Desmet
>Priority: Blocker
>  Labels: bug, interval
> Fix For: 1.8.1
>
> Attachments: test_dag.py, test_dag.py.log
>
>
> Dear, 
> When starting a specific new dag we get the following error:
> [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
> pickle_dags)
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in 
> process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in 
> _process_dags
> self._process_task_instances(dag, tis_out)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in 
> _process_task_instances
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in 
> are_dependencies_met
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in 
> get_failed_dep_statuses
> dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 
> 94, in get_dep_statuses
> for dep_status in self._get_dep_statuses(ti, session, dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", 
> line 47, in _get_dep_statuses
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> TypeError: can't compare datetime.datetime to NoneType
> I have added some debug code to the file 'prev_dagrun_dep.py:
> dag = ti.task.dag
> print 'Start dates:'
> print 'previous_exection_date: 
> %s'%(dag.previous_schedule(ti.execution_date))
> print 'current start date: %s'%(ti.task.start_date)
> if dag.catchup:
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> And this is the output I get:
> Start dates:
> previous_exection_date: None
> current start date: 2017-03-19 00:00:00
> I think it is normall that the previous_exection_date is null, since it is 
> the first time this dag is being run. But why is the start_date of the dag 
> important, and not the start date of the run? 
> I have the feeling the cause is the 'schedule_interval', which is set to 
> None. 
> Please find an example and it's log file as an attachment to this mail. 
> Bert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py

2017-04-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958798#comment-15958798
 ] 

ASF subversion and git services commented on AIRFLOW-1033:
--

Commit ebfc3ea73ae1ffe273e4ff532f1ad47441bef518 in incubator-airflow's branch 
refs/heads/v1-8-test from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ebfc3ea ]

[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

DAGs that did not have a schedule (None or @once)
make the dependency
checker raise an exception as the previous
schedule will not exist.

Also activates all ti_deps tests.

Closes #2220 from bolkedebruin/AIRFLOW-1033

(cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab)
Signed-off-by: Bolke de Bruin 


> TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
> 
>
> Key: AIRFLOW-1033
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1033
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun
>Affects Versions: 1.8.1
> Environment: Centos 7; 
> db: PostgreSQL 9.5
> python version: 2.7
> Installation via pip
>Reporter: Bert Desmet
>Priority: Blocker
>  Labels: bug, interval
> Fix For: 1.8.1
>
> Attachments: test_dag.py, test_dag.py.log
>
>
> Dear, 
> When starting a specific new dag we get the following error:
> [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
> pickle_dags)
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in 
> process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in 
> _process_dags
> self._process_task_instances(dag, tis_out)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in 
> _process_task_instances
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in 
> are_dependencies_met
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in 
> get_failed_dep_statuses
> dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 
> 94, in get_dep_statuses
> for dep_status in self._get_dep_statuses(ti, session, dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", 
> line 47, in _get_dep_statuses
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> TypeError: can't compare datetime.datetime to NoneType
> I have added some debug code to the file 'prev_dagrun_dep.py:
> dag = ti.task.dag
> print 'Start dates:'
> print 'previous_exection_date: 
> %s'%(dag.previous_schedule(ti.execution_date))
> print 'current start date: %s'%(ti.task.start_date)
> if dag.catchup:
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> And this is the output I get:
> Start dates:
> previous_exection_date: None
> current start date: 2017-03-19 00:00:00
> I think it is normall that the previous_exection_date is null, since it is 
> the first time this dag is being run. But why is the start_date of the dag 
> important, and not the start date of the run? 
> I have the feeling the cause is the 'schedule_interval', which is set to 
> None. 
> Please find an example and it's log file as an attachment to this mail. 
> Bert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

2017-04-06 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 916741171 -> ebfc3ea73


[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

DAGs that did not have a schedule (None or @once)
make the dependency
checker raise an exception as the previous
schedule will not exist.

Also activates all ti_deps tests.

Closes #2220 from bolkedebruin/AIRFLOW-1033

(cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ebfc3ea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ebfc3ea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ebfc3ea7

Branch: refs/heads/v1-8-test
Commit: ebfc3ea73ae1ffe273e4ff532f1ad47441bef518
Parents: 9167411
Author: Bolke de Bruin 
Authored: Thu Apr 6 14:03:11 2017 +0200
Committer: Bolke de Bruin 
Committed: Thu Apr 6 14:03:24 2017 +0200

--
 airflow/ti_deps/deps/base_ti_dep.py |  14 +-
 airflow/ti_deps/deps/prev_dagrun_dep.py |   5 +
 .../ti_deps/deps/dag_ti_slots_available_dep.py  |  41 ---
 tests/ti_deps/deps/dag_unpaused_dep.py  |  41 ---
 tests/ti_deps/deps/dagrun_exists_dep.py |  41 ---
 tests/ti_deps/deps/not_in_retry_period_dep.py   |  61 
 tests/ti_deps/deps/not_running_dep.py   |  39 ---
 tests/ti_deps/deps/not_skipped_dep.py   |  38 ---
 tests/ti_deps/deps/pool_has_space_dep.py|  37 ---
 tests/ti_deps/deps/prev_dagrun_dep.py   | 143 -
 tests/ti_deps/deps/runnable_exec_date_dep.py|  92 --
 .../deps/test_dag_ti_slots_available_dep.py |  42 +++
 tests/ti_deps/deps/test_dag_unpaused_dep.py |  42 +++
 tests/ti_deps/deps/test_dagrun_exists_dep.py|  40 +++
 .../deps/test_not_in_retry_period_dep.py|  59 
 tests/ti_deps/deps/test_not_running_dep.py  |  37 +++
 tests/ti_deps/deps/test_not_skipped_dep.py  |  36 +++
 tests/ti_deps/deps/test_prev_dagrun_dep.py  | 123 
 .../ti_deps/deps/test_runnable_exec_date_dep.py |  76 +
 tests/ti_deps/deps/test_trigger_rule_dep.py | 252 
 tests/ti_deps/deps/test_valid_state_dep.py  |  46 +++
 tests/ti_deps/deps/trigger_rule_dep.py  | 295 ---
 tests/ti_deps/deps/valid_state_dep.py   |  49 ---
 23 files changed, 768 insertions(+), 881 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/airflow/ti_deps/deps/base_ti_dep.py
--
diff --git a/airflow/ti_deps/deps/base_ti_dep.py 
b/airflow/ti_deps/deps/base_ti_dep.py
index 0188043..bad1fa0 100644
--- a/airflow/ti_deps/deps/base_ti_dep.py
+++ b/airflow/ti_deps/deps/base_ti_dep.py
@@ -51,7 +51,7 @@ class BaseTIDep(object):
 """
 return getattr(self, 'NAME', self.__class__.__name__)
 
-def _get_dep_statuses(self, ti, session, dep_context):
+def _get_dep_statuses(self, ti, session, dep_context=None):
 """
 Abstract method that returns an iterable of TIDepStatus objects that 
describe
 whether the given task instance has this dependency met.
@@ -69,7 +69,7 @@ class BaseTIDep(object):
 raise NotImplementedError
 
 @provide_session
-def get_dep_statuses(self, ti, session, dep_context):
+def get_dep_statuses(self, ti, session, dep_context=None):
 """
 Wrapper around the private _get_dep_statuses method that contains some 
global
 checks for all dependencies.
@@ -81,6 +81,12 @@ class BaseTIDep(object):
 :param dep_context: the context for which this dependency should be 
evaluated for
 :type dep_context: DepContext
 """
+# this avoids a circular dependency
+from airflow.ti_deps.dep_context import DepContext
+
+if dep_context is None:
+dep_context = DepContext()
+
 if self.IGNOREABLE and dep_context.ignore_all_deps:
 yield self._passing_status(
 reason="Context specified all dependencies should be ignored.")
@@ -95,7 +101,7 @@ class BaseTIDep(object):
 yield dep_status
 
 @provide_session
-def is_met(self, ti, session, dep_context):
+def is_met(self, ti, session, dep_context=None):
 """
 Returns whether or not this dependency is met for a given task 
instance. A
 dependency is considered met if all of the dependency statuses it 
reports are
@@ -113,7 +119,7 @@ class BaseTIDep(object):
self.get_dep_statuses(ti, session, dep_context))
 
 @provide_session
-def get_failure_reasons(self, ti, session, dep_context):
+def get_failure_reasons(self, 

[jira] [Commented] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py

2017-04-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958797#comment-15958797
 ] 

ASF subversion and git services commented on AIRFLOW-1033:
--

Commit ebfc3ea73ae1ffe273e4ff532f1ad47441bef518 in incubator-airflow's branch 
refs/heads/v1-8-test from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ebfc3ea ]

[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

DAGs that did not have a schedule (None or @once)
make the dependency
checker raise an exception as the previous
schedule will not exist.

Also activates all ti_deps tests.

Closes #2220 from bolkedebruin/AIRFLOW-1033

(cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab)
Signed-off-by: Bolke de Bruin 


> TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
> 
>
> Key: AIRFLOW-1033
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1033
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DagRun
>Affects Versions: 1.8.1
> Environment: Centos 7; 
> db: PostgreSQL 9.5
> python version: 2.7
> Installation via pip
>Reporter: Bert Desmet
>Priority: Blocker
>  Labels: bug, interval
> Fix For: 1.8.1
>
> Attachments: test_dag.py, test_dag.py.log
>
>
> Dear, 
> When starting a specific new dag we get the following error:
> [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an 
> exception! Propagating...
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper
> pickle_dags)
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in 
> process_file
> self._process_dags(dagbag, dags, ti_keys_to_schedule)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in 
> _process_dags
> self._process_task_instances(dag, tis_out)
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in 
> _process_task_instances
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in 
> wrapper
> result = func(*args, **kwargs)
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in 
> are_dependencies_met
> session=session):
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in 
> get_failed_dep_statuses
> dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line 
> 94, in get_dep_statuses
> for dep_status in self._get_dep_statuses(ti, session, dep_context):
>   File 
> "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", 
> line 47, in _get_dep_statuses
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> TypeError: can't compare datetime.datetime to NoneType
> I have added some debug code to the file 'prev_dagrun_dep.py:
> dag = ti.task.dag
> print 'Start dates:'
> print 'previous_exection_date: 
> %s'%(dag.previous_schedule(ti.execution_date))
> print 'current start date: %s'%(ti.task.start_date)
> if dag.catchup:
> if dag.previous_schedule(ti.execution_date) < ti.task.start_date:
> And this is the output I get:
> Start dates:
> previous_exection_date: None
> current start date: 2017-03-19 00:00:00
> I think it is normall that the previous_exection_date is null, since it is 
> the first time this dag is being run. But why is the start_date of the dag 
> important, and not the start date of the run? 
> I have the feeling the cause is the 'schedule_interval', which is set to 
> None. 
> Please find an example and it's log file as an attachment to this mail. 
> Bert



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

2017-04-06 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4c41f6e96 -> fbcbd053a


[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

DAGs that did not have a schedule (None or @once)
make the dependency
checker raise an exception as the previous
schedule will not exist.

Also activates all ti_deps tests.

Closes #2220 from bolkedebruin/AIRFLOW-1033


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fbcbd053
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fbcbd053
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fbcbd053

Branch: refs/heads/master
Commit: fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab
Parents: 4c41f6e
Author: Bolke de Bruin 
Authored: Thu Apr 6 14:03:11 2017 +0200
Committer: Bolke de Bruin 
Committed: Thu Apr 6 14:03:11 2017 +0200

--
 airflow/ti_deps/deps/base_ti_dep.py |  14 +-
 airflow/ti_deps/deps/prev_dagrun_dep.py |   5 +
 .../ti_deps/deps/dag_ti_slots_available_dep.py  |  41 ---
 tests/ti_deps/deps/dag_unpaused_dep.py  |  41 ---
 tests/ti_deps/deps/dagrun_exists_dep.py |  41 ---
 tests/ti_deps/deps/not_in_retry_period_dep.py   |  61 
 tests/ti_deps/deps/not_running_dep.py   |  39 ---
 tests/ti_deps/deps/not_skipped_dep.py   |  38 ---
 tests/ti_deps/deps/pool_has_space_dep.py|  37 ---
 tests/ti_deps/deps/prev_dagrun_dep.py   | 143 -
 tests/ti_deps/deps/runnable_exec_date_dep.py|  92 --
 .../deps/test_dag_ti_slots_available_dep.py |  42 +++
 tests/ti_deps/deps/test_dag_unpaused_dep.py |  42 +++
 tests/ti_deps/deps/test_dagrun_exists_dep.py|  40 +++
 .../deps/test_not_in_retry_period_dep.py|  59 
 tests/ti_deps/deps/test_not_running_dep.py  |  37 +++
 tests/ti_deps/deps/test_not_skipped_dep.py  |  36 +++
 tests/ti_deps/deps/test_prev_dagrun_dep.py  | 123 
 .../ti_deps/deps/test_runnable_exec_date_dep.py |  76 +
 tests/ti_deps/deps/test_trigger_rule_dep.py | 252 
 tests/ti_deps/deps/test_valid_state_dep.py  |  46 +++
 tests/ti_deps/deps/trigger_rule_dep.py  | 295 ---
 tests/ti_deps/deps/valid_state_dep.py   |  49 ---
 23 files changed, 768 insertions(+), 881 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fbcbd053/airflow/ti_deps/deps/base_ti_dep.py
--
diff --git a/airflow/ti_deps/deps/base_ti_dep.py 
b/airflow/ti_deps/deps/base_ti_dep.py
index d735264..810852d 100644
--- a/airflow/ti_deps/deps/base_ti_dep.py
+++ b/airflow/ti_deps/deps/base_ti_dep.py
@@ -51,7 +51,7 @@ class BaseTIDep(object):
 """
 return getattr(self, 'NAME', self.__class__.__name__)
 
-def _get_dep_statuses(self, ti, session, dep_context):
+def _get_dep_statuses(self, ti, session, dep_context=None):
 """
 Abstract method that returns an iterable of TIDepStatus objects that 
describe
 whether the given task instance has this dependency met.
@@ -69,7 +69,7 @@ class BaseTIDep(object):
 raise NotImplementedError
 
 @provide_session
-def get_dep_statuses(self, ti, session, dep_context):
+def get_dep_statuses(self, ti, session, dep_context=None):
 """
 Wrapper around the private _get_dep_statuses method that contains some 
global
 checks for all dependencies.
@@ -81,6 +81,12 @@ class BaseTIDep(object):
 :param dep_context: the context for which this dependency should be 
evaluated for
 :type dep_context: DepContext
 """
+# this avoids a circular dependency
+from airflow.ti_deps.dep_context import DepContext
+
+if dep_context is None:
+dep_context = DepContext()
+
 if self.IGNOREABLE and dep_context.ignore_all_deps:
 yield self._passing_status(
 reason="Context specified all dependencies should be ignored.")
@@ -95,7 +101,7 @@ class BaseTIDep(object):
 yield dep_status
 
 @provide_session
-def is_met(self, ti, session, dep_context):
+def is_met(self, ti, session, dep_context=None):
 """
 Returns whether or not this dependency is met for a given task 
instance. A
 dependency is considered met if all of the dependency statuses it 
reports are
@@ -113,7 +119,7 @@ class BaseTIDep(object):
self.get_dep_statuses(ti, session, dep_context))
 
 @provide_session
-def get_failure_reasons(self, ti, session, dep_context):
+def get_failure_reasons(self, ti, session, dep_context=None):
 """
 Returns an iterable of strings that explain why this dependency wasn't 

[jira] [Created] (AIRFLOW-1080) airflow_local_settings: no effect on task.template_fields changes in policy function

2017-04-06 Thread andrei (JIRA)
andrei created AIRFLOW-1080:
---

 Summary: airflow_local_settings: no effect on task.template_fields 
changes in policy function 
 Key: AIRFLOW-1080
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1080
 Project: Apache Airflow
  Issue Type: Bug
  Components: configuration
Affects Versions: 1.9.0
Reporter: andrei
Priority: Minor


I am trying to add new template_fields to some operators. I have tried to do 
that using policy() function, but not succeed. 
Maybe it's wrong way to change existing operators? 

{quote}
def policy(task):
if task.__class__.__name__ == 'FileToGoogleCloudStorageOperator':
print('I am working!')
task.template_fields = ('bucket', )
task.resolve_template_files()
task.render_templates()
{quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-969] Catch bad python_callable argument

2017-04-06 Thread bolke
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test dff6d21bf -> 916741171


[AIRFLOW-969] Catch bad python_callable argument

Checks for callable when Operator is
created, not when it is run.

* added initial PythonOperator unit test, testing
run
* python_callable must be callable; added unit
test

Closes #2142 from abloomston/python-callable

(cherry picked from commit 12901ddfa9961a11feaa3f17696d19102ff8ecd0)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/91674117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/91674117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/91674117

Branch: refs/heads/v1-8-test
Commit: 916741171cc0c6426dbcbe8a2b5ce2468fce870d
Parents: dff6d21
Author: abloomston 
Authored: Thu Mar 16 19:36:00 2017 -0400
Committer: Bolke de Bruin 
Committed: Thu Apr 6 09:47:18 2017 +0200

--
 airflow/operators/python_operator.py | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91674117/airflow/operators/python_operator.py
--
diff --git a/airflow/operators/python_operator.py 
b/airflow/operators/python_operator.py
index 114bc7e..cf240f2 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -16,6 +16,7 @@ from builtins import str
 from datetime import datetime
 import logging
 
+from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator, TaskInstance
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
@@ -63,6 +64,8 @@ class PythonOperator(BaseOperator):
 templates_exts=None,
 *args, **kwargs):
 super(PythonOperator, self).__init__(*args, **kwargs)
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
 self.python_callable = python_callable
 self.op_args = op_args or []
 self.op_kwargs = op_kwargs or {}



[jira] [Commented] (AIRFLOW-969) Catch bad python_callable argument at DAG construction rather than Task run

2017-04-06 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958483#comment-15958483
 ] 

ASF subversion and git services commented on AIRFLOW-969:
-

Commit 916741171cc0c6426dbcbe8a2b5ce2468fce870d in incubator-airflow's branch 
refs/heads/v1-8-test from abloomston
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=9167411 ]

[AIRFLOW-969] Catch bad python_callable argument

Checks for callable when Operator is
created, not when it is run.

* added initial PythonOperator unit test, testing
run
* python_callable must be callable; added unit
test

Closes #2142 from abloomston/python-callable

(cherry picked from commit 12901ddfa9961a11feaa3f17696d19102ff8ecd0)
Signed-off-by: Bolke de Bruin 


> Catch bad python_callable argument at DAG construction rather than Task run
> ---
>
> Key: AIRFLOW-969
> URL: https://issues.apache.org/jira/browse/AIRFLOW-969
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators
>Reporter: Adam Bloomston
>Assignee: Adam Bloomston
>Priority: Minor
> Fix For: 1.9.0
>
>
> If a non-callable parameter for python_callable is passed to PythonOperator, 
> it should fail to instantiate.  This will move such failures from task run to 
> DAG instantiation. Better to catch such errors sooner rather than later in 
> execution.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)