[jira] [Commented] (AIRFLOW-1060) dag lost tracking the status of tasks and stuck in running state
[ https://issues.apache.org/jira/browse/AIRFLOW-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960002#comment-15960002 ] Adam Whitlock commented on AIRFLOW-1060: [~jeffliujing] - I believe you might want a different Adam for this question. I could help out if it were Azkaban, but not Airflow. Sorry! > dag lost tracking the status of tasks and stuck in running state > > > Key: AIRFLOW-1060 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1060 > Project: Apache Airflow > Issue Type: Bug >Reporter: Jeff Liu > > I'm running an airflow 1.7.1 in one of my environment and constantly run into > an issue with the main dag status stuck in "running" state, while the tasks > all have completed successfully. > To resolve the issue, I had to "delete" the dag entry in airflow UI, and > re-run the job manually so the dag job can recognize the tasks are all > completed and set it self to successful after re-run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1060) dag lost tracking the status of tasks and stuck in running state
[ https://issues.apache.org/jira/browse/AIRFLOW-1060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959984#comment-15959984 ] Jeff Liu commented on AIRFLOW-1060: --- [~alloydwhitlock] Adam, any idea on this issue? > dag lost tracking the status of tasks and stuck in running state > > > Key: AIRFLOW-1060 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1060 > Project: Apache Airflow > Issue Type: Bug >Reporter: Jeff Liu > > I'm running an airflow 1.7.1 in one of my environment and constantly run into > an issue with the main dag status stuck in "running" state, while the tasks > all have completed successfully. > To resolve the issue, I had to "delete" the dag entry in airflow UI, and > re-run the job manually so the dag job can recognize the tasks are all > completed and set it self to successful after re-run. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1082) Graph legend pushes graph to flat lines if many operators are present
Michael Erdely created AIRFLOW-1082: --- Summary: Graph legend pushes graph to flat lines if many operators are present Key: AIRFLOW-1082 URL: https://issues.apache.org/jira/browse/AIRFLOW-1082 Project: Apache Airflow Issue Type: Bug Affects Versions: 1.8.0 Reporter: Michael Erdely Attachments: landing-times.png If many operators are present, the legend representing these operators pushes the graph to be flat. This occurs on the Task Duration, Task Tries and Land Times tabs. This is new to 1.8.0 See attachment for reference. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1081) Task duration page is slow
Alex Guziel created AIRFLOW-1081: Summary: Task duration page is slow Key: AIRFLOW-1081 URL: https://issues.apache.org/jira/browse/AIRFLOW-1081 Project: Apache Airflow Issue Type: Bug Reporter: Alex Guziel Assignee: Alex Guziel It makes a number of queries proportional to the data size, instead of just 2. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-1028) Databricks Operator for Airflow
[ https://issues.apache.org/jira/browse/AIRFLOW-1028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Arthur Wiedmer resolved AIRFLOW-1028. - Resolution: Fixed Fix Version/s: 1.9.0 Issue resolved by pull request #2202 [https://github.com/apache/incubator-airflow/pull/2202] > Databricks Operator for Airflow > --- > > Key: AIRFLOW-1028 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1028 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Andrew Chen >Assignee: Andrew Chen > Fix For: 1.9.0 > > > It would be nice to have a Databricks Operator/Hook in Airflow so users of > Databricks can more easily integrate with Airflow. > The operator would submit a spark job to our new /jobs/runs/submit endpoint. > This endpoint is similar to > https://docs.databricks.com/api/latest/jobs.html#jobscreatejob but does not > include the email_notifications, max_retries, min_retry_interval_millis, > retry_on_timeout, schedule, max_concurrent_runs fields. (The submit docs are > not out because it's still a private endpoint.) > Our proposed design for the operator then is to match this REST API endpoint. > Each argument to the parameter is named to be one of the fields of the REST > API request and the value of the argument will match the type expected by the > REST API. We will also merge extra keys from kwargs which should not be > passed to the BaseOperator into our API call in order to be flexible to > updates. > In the case that this interface is not very user friendly, we can later add > more operators which extend this operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1028) Databricks Operator for Airflow
[ https://issues.apache.org/jira/browse/AIRFLOW-1028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959101#comment-15959101 ] ASF subversion and git services commented on AIRFLOW-1028: -- Commit 53ca5084561fd5c13996609f2eda6baf717249b5 in incubator-airflow's branch refs/heads/master from [~andrewmchen] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=53ca508 ] [AIRFLOW-1028] Databricks Operator for Airflow Add DatabricksSubmitRun Operator In this PR, we contribute a DatabricksSubmitRun operator and a Databricks hook. This operator enables easy integration of Airflow with Databricks. In addition to the operator, we have created a databricks_default connection, an example_dag using this DatabricksSubmitRunOperator, and matching documentation. Closes #2202 from andrewmchen/databricks-operator- squashed > Databricks Operator for Airflow > --- > > Key: AIRFLOW-1028 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1028 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Andrew Chen >Assignee: Andrew Chen > > It would be nice to have a Databricks Operator/Hook in Airflow so users of > Databricks can more easily integrate with Airflow. > The operator would submit a spark job to our new /jobs/runs/submit endpoint. > This endpoint is similar to > https://docs.databricks.com/api/latest/jobs.html#jobscreatejob but does not > include the email_notifications, max_retries, min_retry_interval_millis, > retry_on_timeout, schedule, max_concurrent_runs fields. (The submit docs are > not out because it's still a private endpoint.) > Our proposed design for the operator then is to match this REST API endpoint. > Each argument to the parameter is named to be one of the fields of the REST > API request and the value of the argument will match the type expected by the > REST API. We will also merge extra keys from kwargs which should not be > passed to the BaseOperator into our API call in order to be flexible to > updates. > In the case that this interface is not very user friendly, we can later add > more operators which extend this operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1028] Databricks Operator for Airflow
Repository: incubator-airflow Updated Branches: refs/heads/master 5a6f18f1c -> 53ca50845 [AIRFLOW-1028] Databricks Operator for Airflow Add DatabricksSubmitRun Operator In this PR, we contribute a DatabricksSubmitRun operator and a Databricks hook. This operator enables easy integration of Airflow with Databricks. In addition to the operator, we have created a databricks_default connection, an example_dag using this DatabricksSubmitRunOperator, and matching documentation. Closes #2202 from andrewmchen/databricks-operator- squashed Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/53ca5084 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/53ca5084 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/53ca5084 Branch: refs/heads/master Commit: 53ca5084561fd5c13996609f2eda6baf717249b5 Parents: 5a6f18f Author: Andrew ChenAuthored: Thu Apr 6 08:30:01 2017 -0700 Committer: Arthur Wiedmer Committed: Thu Apr 6 08:30:33 2017 -0700 -- .../example_dags/example_databricks_operator.py | 82 +++ airflow/contrib/hooks/databricks_hook.py| 202 + .../contrib/operators/databricks_operator.py| 211 + airflow/exceptions.py | 2 +- airflow/models.py | 1 + airflow/utils/db.py | 4 + docs/code.rst | 1 + docs/integration.rst| 13 ++ setup.py| 2 + tests/contrib/hooks/databricks_hook.py | 226 +++ tests/contrib/operators/databricks_operator.py | 185 +++ 11 files changed, 928 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/53ca5084/airflow/contrib/example_dags/example_databricks_operator.py -- diff --git a/airflow/contrib/example_dags/example_databricks_operator.py b/airflow/contrib/example_dags/example_databricks_operator.py new file mode 100644 index 000..abf6844 --- /dev/null +++ b/airflow/contrib/example_dags/example_databricks_operator.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import airflow + +from airflow import DAG +from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator + +# This is an example DAG which uses the DatabricksSubmitRunOperator. +# In this example, we create two tasks which execute sequentially. +# The first task is to run a notebook at the workspace path "/test" +# and the second task is to run a JAR uploaded to DBFS. Both, +# tasks use new clusters. +# +# Because we have set a downstream dependency on the notebook task, +# the spark jar task will NOT run until the notebook task completes +# successfully. +# +# The definition of a succesful run is if the run has a result_state of "SUCCESS". +# For more information about the state of a run refer to +# https://docs.databricks.com/api/latest/jobs.html#runstate + +args = { +'owner': 'airflow', +'email': ['airf...@example.com'], +'depends_on_past': False, +'start_date': airflow.utils.dates.days_ago(2) +} + +dag = DAG( +dag_id='example_databricks_operator', default_args=args, +schedule_interval='@daily') + +new_cluster = { +'spark_version': '2.1.0-db3-scala2.11', +'node_type_id': 'r3.xlarge', +'aws_attributes': { +'availability': 'ON_DEMAND' +}, +'num_workers': 8 +} + +notebook_task_params = { +'new_cluster': new_cluster, +'notebook_task': { +'notebook_path': '/Users/airf...@example.com/PrepareData', +}, +} +# Example of using the JSON parameter to initialize the operator. +notebook_task = DatabricksSubmitRunOperator( +task_id='notebook_task', +dag=dag, +json=notebook_task_params) + +# Example of using the named parameters of DatabricksSubmitRunOperator +# to initialize the operator. +spark_jar_task = DatabricksSubmitRunOperator( +task_id='spark_jar_task', +dag=dag, +new_cluster=new_cluster, +spark_jar_task={ +'main_class_name': 'com.example.ProcessData' +
[jira] [Updated] (AIRFLOW-1050) Retries ignored - regression
[ https://issues.apache.org/jira/browse/AIRFLOW-1050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin updated AIRFLOW-1050: Component/s: backfill > Retries ignored - regression > > > Key: AIRFLOW-1050 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1050 > Project: Apache Airflow > Issue Type: Bug > Components: backfill >Affects Versions: 1.8.0 >Reporter: Ján Koščo >Assignee: Bolke de Bruin >Priority: Blocker > Fix For: 1.8.1 > > Attachments: Screen Shot 2017-03-28 at 11.15.51.png, Screen Shot > 2017-03-28 at 11.15.59.png > > > SubDag fails when first operator fails, despite the fact it's configured for > retries. Information in UI afterwards are also incorrect. From SubDag > prospective it's still {{running}} with operator marked as {{up_for_retry}}, > from main DAG prospective, whole run is marked as {{failed}} same as SubDag. > See attached screenshots. Latest not affected version is RC4 (310fb58). I > tested RC5, 1.8.0 with LocalExecutor and CeleryExecutor. > Example code: > {code} > from datetime import datetime, timedelta > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.python_operator import PythonOperator > from airflow.operators.subdag_operator import SubDagOperator > args = { > "start_date": datetime.today(), > } > dag = DAG( > dag_id="main", default_args=args, > dagrun_timeout=timedelta(minutes=60), > schedule_interval=None, > max_active_runs=1 > ) > sub_dag = DAG( > dag_id="main.test", > default_args=args, > schedule_interval=None, > ) > op = BashOperator( > task_id="first", > dag=sub_dag, > bash_command="echo 1" > ) > def throw_error(): > raise RuntimeError() > op2 = PythonOperator( > task_id="second", > dag=sub_dag, > python_callable=throw_error, > retries=3, > retry_delay=timedelta(0, 20) > ) > op >> op2 > prepare_environment = SubDagOperator( > task_id='test', > subdag=sub_dag, > default_args=args, > dag=dag, > ) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (AIRFLOW-1050) Retries ignored - regression
[ https://issues.apache.org/jira/browse/AIRFLOW-1050?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin reassigned AIRFLOW-1050: --- Assignee: Bolke de Bruin > Retries ignored - regression > > > Key: AIRFLOW-1050 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1050 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.8.0 >Reporter: Ján Koščo >Assignee: Bolke de Bruin >Priority: Blocker > Fix For: 1.8.1 > > Attachments: Screen Shot 2017-03-28 at 11.15.51.png, Screen Shot > 2017-03-28 at 11.15.59.png > > > SubDag fails when first operator fails, despite the fact it's configured for > retries. Information in UI afterwards are also incorrect. From SubDag > prospective it's still {{running}} with operator marked as {{up_for_retry}}, > from main DAG prospective, whole run is marked as {{failed}} same as SubDag. > See attached screenshots. Latest not affected version is RC4 (310fb58). I > tested RC5, 1.8.0 with LocalExecutor and CeleryExecutor. > Example code: > {code} > from datetime import datetime, timedelta > from airflow.models import DAG > from airflow.operators.bash_operator import BashOperator > from airflow.operators.python_operator import PythonOperator > from airflow.operators.subdag_operator import SubDagOperator > args = { > "start_date": datetime.today(), > } > dag = DAG( > dag_id="main", default_args=args, > dagrun_timeout=timedelta(minutes=60), > schedule_interval=None, > max_active_runs=1 > ) > sub_dag = DAG( > dag_id="main.test", > default_args=args, > schedule_interval=None, > ) > op = BashOperator( > task_id="first", > dag=sub_dag, > bash_command="echo 1" > ) > def throw_error(): > raise RuntimeError() > op2 = PythonOperator( > task_id="second", > dag=sub_dag, > python_callable=throw_error, > retries=3, > retry_delay=timedelta(0, 20) > ) > op >> op2 > prepare_environment = SubDagOperator( > task_id='test', > subdag=sub_dag, > default_args=args, > dag=dag, > ) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1075) Cleanup security docs
[ https://issues.apache.org/jira/browse/AIRFLOW-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958803#comment-15958803 ] ASF subversion and git services commented on AIRFLOW-1075: -- Commit 5a6f18f1caebc195569be4397bfe8cb36fec3f1a in incubator-airflow's branch refs/heads/master from [~dxhuang] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5a6f18f ] [AIRFLOW-1075] Security docs cleanup Closes # from dhuang/AIRFLOW-1075 > Cleanup security docs > - > > Key: AIRFLOW-1075 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1075 > Project: Apache Airflow > Issue Type: Improvement > Components: docs >Reporter: Daniel Huang >Assignee: Daniel Huang >Priority: Trivial > Fix For: 1.9.0 > > > Noticed a few minor things to fix, like "Impersonation" being under "SSL" > section. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (AIRFLOW-1075) Cleanup security docs
[ https://issues.apache.org/jira/browse/AIRFLOW-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1075. - Resolution: Fixed Fix Version/s: 1.9.0 Issue resolved by pull request # [https://github.com/apache/incubator-airflow/pull/] > Cleanup security docs > - > > Key: AIRFLOW-1075 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1075 > Project: Apache Airflow > Issue Type: Improvement > Components: docs >Reporter: Daniel Huang >Assignee: Daniel Huang >Priority: Trivial > Fix For: 1.9.0 > > > Noticed a few minor things to fix, like "Impersonation" being under "SSL" > section. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1075) Cleanup security docs
[ https://issues.apache.org/jira/browse/AIRFLOW-1075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958802#comment-15958802 ] ASF subversion and git services commented on AIRFLOW-1075: -- Commit 5a6f18f1caebc195569be4397bfe8cb36fec3f1a in incubator-airflow's branch refs/heads/master from [~dxhuang] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5a6f18f ] [AIRFLOW-1075] Security docs cleanup Closes # from dhuang/AIRFLOW-1075 > Cleanup security docs > - > > Key: AIRFLOW-1075 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1075 > Project: Apache Airflow > Issue Type: Improvement > Components: docs >Reporter: Daniel Huang >Assignee: Daniel Huang >Priority: Trivial > Fix For: 1.9.0 > > > Noticed a few minor things to fix, like "Impersonation" being under "SSL" > section. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1075] Security docs cleanup
Repository: incubator-airflow Updated Branches: refs/heads/master fbcbd053a -> 5a6f18f1c [AIRFLOW-1075] Security docs cleanup Closes # from dhuang/AIRFLOW-1075 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/5a6f18f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/5a6f18f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/5a6f18f1 Branch: refs/heads/master Commit: 5a6f18f1caebc195569be4397bfe8cb36fec3f1a Parents: fbcbd05 Author: Daniel HuangAuthored: Thu Apr 6 14:12:13 2017 +0200 Committer: Bolke de Bruin Committed: Thu Apr 6 14:12:13 2017 +0200 -- docs/security.rst | 62 ++ 1 file changed, 37 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/5a6f18f1/docs/security.rst -- diff --git a/docs/security.rst b/docs/security.rst index 7c06fe3..c0e2918 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -6,7 +6,7 @@ to the web application is to do it at the network level, or by using SSH tunnels. It is however possible to switch on authentication by either using one of the supplied -backends or create your own. +backends or creating your own. Web Authentication -- @@ -89,7 +89,7 @@ Roll your own Airflow uses ``flask_login`` and exposes a set of hooks in the ``airflow.default_login`` module. You can -alter the content and make it part of the ``PYTHONPATH`` and configure it as a backend in ``airflow.cfg```. +alter the content and make it part of the ``PYTHONPATH`` and configure it as a backend in ``airflow.cfg``. .. code-block:: bash @@ -100,12 +100,14 @@ alter the content and make it part of the ``PYTHONPATH`` and configure it as a b Multi-tenancy - -You can filter the list of dags in webserver by owner name, when authentication -is turned on, by setting webserver.filter_by_owner as true in your ``airflow.cfg`` -With this, when a user authenticates and logs into webserver, it will see only the dags -which it is owner of. A super_user, will be able to see all the dags although. -This makes the web UI a multi-tenant UI, where a user will only be able to see dags -created by itself. +You can filter the list of dags in webserver by owner name when authentication +is turned on by setting ``webserver:filter_by_owner`` in your config. With this, a user will see +only the dags which it is owner of, unless it is a superuser. + +.. code-block:: bash + +[webserver] +filter_by_owner = True Kerberos @@ -118,17 +120,18 @@ to authenticate against kerberized services. Limitations ''' -Please note that at this time not all hooks have been adjusted to make use of this functionality yet. +Please note that at this time, not all hooks have been adjusted to make use of this functionality. Also it does not integrate kerberos into the web interface and you will have to rely on network level security for now to make sure your service remains secure. -Celery integration has not been tried and tested yet. However if you generate a key tab for every host -and launch a ticket renewer next to every worker it will most likely work. +Celery integration has not been tried and tested yet. However, if you generate a key tab for every +host and launch a ticket renewer next to every worker it will most likely work. Enabling kerberos ' - Airflow +Airflow +^^^ To enable kerberos you will need to generate a (service) key tab. @@ -160,7 +163,8 @@ Launch the ticket renewer by # run ticket renewer airflow kerberos - Hadoop +Hadoop +^^ If want to use impersonation this needs to be enabled in ``core-site.xml`` of your hadoop config. @@ -186,8 +190,8 @@ Of course if you need to tighten your security replace the asterisk with somethi Using kerberos authentication ' -The hive hook has been updated to take advantage of kerberos authentication. To allow your DAGs to use it simply -update the connection details with, for example: +The hive hook has been updated to take advantage of kerberos authentication. To allow your DAGs to +use it, simply update the connection details with, for example: .. code-block:: bash @@ -197,7 +201,7 @@ Adjust the principal to your settings. The _HOST part will be replaced by the fu the server. You can specify if you would like to use the dag owner as the user for the connection or the user specified in the login -section of the connection. For the login user specify the following as extra: +section of the connection. For the login user, specify the
[jira] [Resolved] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
[ https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1033. - Resolution: Fixed Issue resolved by pull request #2220 [https://github.com/apache/incubator-airflow/pull/2220] > TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py > > > Key: AIRFLOW-1033 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1033 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun >Affects Versions: 1.8.1 > Environment: Centos 7; > db: PostgreSQL 9.5 > python version: 2.7 > Installation via pip >Reporter: Bert Desmet >Priority: Blocker > Labels: bug, interval > Fix For: 1.8.1 > > Attachments: test_dag.py, test_dag.py.log > > > Dear, > When starting a specific new dag we get the following error: > [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper > pickle_dags) > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in > process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in > _process_dags > self._process_task_instances(dag, tis_out) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in > _process_task_instances > session=session): > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in > are_dependencies_met > session=session): > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in > get_failed_dep_statuses > dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line > 94, in get_dep_statuses > for dep_status in self._get_dep_statuses(ti, session, dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", > line 47, in _get_dep_statuses > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > TypeError: can't compare datetime.datetime to NoneType > I have added some debug code to the file 'prev_dagrun_dep.py: > dag = ti.task.dag > print 'Start dates:' > print 'previous_exection_date: > %s'%(dag.previous_schedule(ti.execution_date)) > print 'current start date: %s'%(ti.task.start_date) > if dag.catchup: > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > And this is the output I get: > Start dates: > previous_exection_date: None > current start date: 2017-03-19 00:00:00 > I think it is normall that the previous_exection_date is null, since it is > the first time this dag is being run. But why is the start_date of the dag > important, and not the start date of the run? > I have the feeling the cause is the 'schedule_interval', which is set to > None. > Please find an example and it's log file as an attachment to this mail. > Bert -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
[ https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958795#comment-15958795 ] ASF subversion and git services commented on AIRFLOW-1033: -- Commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=fbcbd05 ] [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags DAGs that did not have a schedule (None or @once) make the dependency checker raise an exception as the previous schedule will not exist. Also activates all ti_deps tests. Closes #2220 from bolkedebruin/AIRFLOW-1033 > TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py > > > Key: AIRFLOW-1033 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1033 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun >Affects Versions: 1.8.1 > Environment: Centos 7; > db: PostgreSQL 9.5 > python version: 2.7 > Installation via pip >Reporter: Bert Desmet >Priority: Blocker > Labels: bug, interval > Fix For: 1.8.1 > > Attachments: test_dag.py, test_dag.py.log > > > Dear, > When starting a specific new dag we get the following error: > [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper > pickle_dags) > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in > process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in > _process_dags > self._process_task_instances(dag, tis_out) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in > _process_task_instances > session=session): > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in > are_dependencies_met > session=session): > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in > get_failed_dep_statuses > dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line > 94, in get_dep_statuses > for dep_status in self._get_dep_statuses(ti, session, dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", > line 47, in _get_dep_statuses > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > TypeError: can't compare datetime.datetime to NoneType > I have added some debug code to the file 'prev_dagrun_dep.py: > dag = ti.task.dag > print 'Start dates:' > print 'previous_exection_date: > %s'%(dag.previous_schedule(ti.execution_date)) > print 'current start date: %s'%(ti.task.start_date) > if dag.catchup: > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > And this is the output I get: > Start dates: > previous_exection_date: None > current start date: 2017-03-19 00:00:00 > I think it is normall that the previous_exection_date is null, since it is > the first time this dag is being run. But why is the start_date of the dag > important, and not the start date of the run? > I have the feeling the cause is the 'schedule_interval', which is set to > None. > Please find an example and it's log file as an attachment to this mail. > Bert -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
[ https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958796#comment-15958796 ] ASF subversion and git services commented on AIRFLOW-1033: -- Commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=fbcbd05 ] [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags DAGs that did not have a schedule (None or @once) make the dependency checker raise an exception as the previous schedule will not exist. Also activates all ti_deps tests. Closes #2220 from bolkedebruin/AIRFLOW-1033 > TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py > > > Key: AIRFLOW-1033 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1033 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun >Affects Versions: 1.8.1 > Environment: Centos 7; > db: PostgreSQL 9.5 > python version: 2.7 > Installation via pip >Reporter: Bert Desmet >Priority: Blocker > Labels: bug, interval > Fix For: 1.8.1 > > Attachments: test_dag.py, test_dag.py.log > > > Dear, > When starting a specific new dag we get the following error: > [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper > pickle_dags) > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in > process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in > _process_dags > self._process_task_instances(dag, tis_out) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in > _process_task_instances > session=session): > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in > are_dependencies_met > session=session): > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in > get_failed_dep_statuses > dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line > 94, in get_dep_statuses > for dep_status in self._get_dep_statuses(ti, session, dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", > line 47, in _get_dep_statuses > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > TypeError: can't compare datetime.datetime to NoneType > I have added some debug code to the file 'prev_dagrun_dep.py: > dag = ti.task.dag > print 'Start dates:' > print 'previous_exection_date: > %s'%(dag.previous_schedule(ti.execution_date)) > print 'current start date: %s'%(ti.task.start_date) > if dag.catchup: > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > And this is the output I get: > Start dates: > previous_exection_date: None > current start date: 2017-03-19 00:00:00 > I think it is normall that the previous_exection_date is null, since it is > the first time this dag is being run. But why is the start_date of the dag > important, and not the start date of the run? > I have the feeling the cause is the 'schedule_interval', which is set to > None. > Please find an example and it's log file as an attachment to this mail. > Bert -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
[ https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958798#comment-15958798 ] ASF subversion and git services commented on AIRFLOW-1033: -- Commit ebfc3ea73ae1ffe273e4ff532f1ad47441bef518 in incubator-airflow's branch refs/heads/v1-8-test from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ebfc3ea ] [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags DAGs that did not have a schedule (None or @once) make the dependency checker raise an exception as the previous schedule will not exist. Also activates all ti_deps tests. Closes #2220 from bolkedebruin/AIRFLOW-1033 (cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab) Signed-off-by: Bolke de Bruin> TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py > > > Key: AIRFLOW-1033 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1033 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun >Affects Versions: 1.8.1 > Environment: Centos 7; > db: PostgreSQL 9.5 > python version: 2.7 > Installation via pip >Reporter: Bert Desmet >Priority: Blocker > Labels: bug, interval > Fix For: 1.8.1 > > Attachments: test_dag.py, test_dag.py.log > > > Dear, > When starting a specific new dag we get the following error: > [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper > pickle_dags) > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in > process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in > _process_dags > self._process_task_instances(dag, tis_out) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in > _process_task_instances > session=session): > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in > are_dependencies_met > session=session): > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in > get_failed_dep_statuses > dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line > 94, in get_dep_statuses > for dep_status in self._get_dep_statuses(ti, session, dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", > line 47, in _get_dep_statuses > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > TypeError: can't compare datetime.datetime to NoneType > I have added some debug code to the file 'prev_dagrun_dep.py: > dag = ti.task.dag > print 'Start dates:' > print 'previous_exection_date: > %s'%(dag.previous_schedule(ti.execution_date)) > print 'current start date: %s'%(ti.task.start_date) > if dag.catchup: > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > And this is the output I get: > Start dates: > previous_exection_date: None > current start date: 2017-03-19 00:00:00 > I think it is normall that the previous_exection_date is null, since it is > the first time this dag is being run. But why is the start_date of the dag > important, and not the start date of the run? > I have the feeling the cause is the 'schedule_interval', which is set to > None. > Please find an example and it's log file as an attachment to this mail. > Bert -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags
Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test 916741171 -> ebfc3ea73 [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags DAGs that did not have a schedule (None or @once) make the dependency checker raise an exception as the previous schedule will not exist. Also activates all ti_deps tests. Closes #2220 from bolkedebruin/AIRFLOW-1033 (cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ebfc3ea7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ebfc3ea7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ebfc3ea7 Branch: refs/heads/v1-8-test Commit: ebfc3ea73ae1ffe273e4ff532f1ad47441bef518 Parents: 9167411 Author: Bolke de Bruin Authored: Thu Apr 6 14:03:11 2017 +0200 Committer: Bolke de Bruin Committed: Thu Apr 6 14:03:24 2017 +0200 -- airflow/ti_deps/deps/base_ti_dep.py | 14 +- airflow/ti_deps/deps/prev_dagrun_dep.py | 5 + .../ti_deps/deps/dag_ti_slots_available_dep.py | 41 --- tests/ti_deps/deps/dag_unpaused_dep.py | 41 --- tests/ti_deps/deps/dagrun_exists_dep.py | 41 --- tests/ti_deps/deps/not_in_retry_period_dep.py | 61 tests/ti_deps/deps/not_running_dep.py | 39 --- tests/ti_deps/deps/not_skipped_dep.py | 38 --- tests/ti_deps/deps/pool_has_space_dep.py| 37 --- tests/ti_deps/deps/prev_dagrun_dep.py | 143 - tests/ti_deps/deps/runnable_exec_date_dep.py| 92 -- .../deps/test_dag_ti_slots_available_dep.py | 42 +++ tests/ti_deps/deps/test_dag_unpaused_dep.py | 42 +++ tests/ti_deps/deps/test_dagrun_exists_dep.py| 40 +++ .../deps/test_not_in_retry_period_dep.py| 59 tests/ti_deps/deps/test_not_running_dep.py | 37 +++ tests/ti_deps/deps/test_not_skipped_dep.py | 36 +++ tests/ti_deps/deps/test_prev_dagrun_dep.py | 123 .../ti_deps/deps/test_runnable_exec_date_dep.py | 76 + tests/ti_deps/deps/test_trigger_rule_dep.py | 252 tests/ti_deps/deps/test_valid_state_dep.py | 46 +++ tests/ti_deps/deps/trigger_rule_dep.py | 295 --- tests/ti_deps/deps/valid_state_dep.py | 49 --- 23 files changed, 768 insertions(+), 881 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/airflow/ti_deps/deps/base_ti_dep.py -- diff --git a/airflow/ti_deps/deps/base_ti_dep.py b/airflow/ti_deps/deps/base_ti_dep.py index 0188043..bad1fa0 100644 --- a/airflow/ti_deps/deps/base_ti_dep.py +++ b/airflow/ti_deps/deps/base_ti_dep.py @@ -51,7 +51,7 @@ class BaseTIDep(object): """ return getattr(self, 'NAME', self.__class__.__name__) -def _get_dep_statuses(self, ti, session, dep_context): +def _get_dep_statuses(self, ti, session, dep_context=None): """ Abstract method that returns an iterable of TIDepStatus objects that describe whether the given task instance has this dependency met. @@ -69,7 +69,7 @@ class BaseTIDep(object): raise NotImplementedError @provide_session -def get_dep_statuses(self, ti, session, dep_context): +def get_dep_statuses(self, ti, session, dep_context=None): """ Wrapper around the private _get_dep_statuses method that contains some global checks for all dependencies. @@ -81,6 +81,12 @@ class BaseTIDep(object): :param dep_context: the context for which this dependency should be evaluated for :type dep_context: DepContext """ +# this avoids a circular dependency +from airflow.ti_deps.dep_context import DepContext + +if dep_context is None: +dep_context = DepContext() + if self.IGNOREABLE and dep_context.ignore_all_deps: yield self._passing_status( reason="Context specified all dependencies should be ignored.") @@ -95,7 +101,7 @@ class BaseTIDep(object): yield dep_status @provide_session -def is_met(self, ti, session, dep_context): +def is_met(self, ti, session, dep_context=None): """ Returns whether or not this dependency is met for a given task instance. A dependency is considered met if all of the dependency statuses it reports are @@ -113,7 +119,7 @@ class BaseTIDep(object): self.get_dep_statuses(ti, session, dep_context)) @provide_session -def get_failure_reasons(self, ti, session, dep_context): +def get_failure_reasons(self,
[jira] [Commented] (AIRFLOW-1033) TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py
[ https://issues.apache.org/jira/browse/AIRFLOW-1033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958797#comment-15958797 ] ASF subversion and git services commented on AIRFLOW-1033: -- Commit ebfc3ea73ae1ffe273e4ff532f1ad47441bef518 in incubator-airflow's branch refs/heads/v1-8-test from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ebfc3ea ] [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags DAGs that did not have a schedule (None or @once) make the dependency checker raise an exception as the previous schedule will not exist. Also activates all ti_deps tests. Closes #2220 from bolkedebruin/AIRFLOW-1033 (cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab) Signed-off-by: Bolke de Bruin> TypeError: can't compare datetime.datetime to NoneType in prev_dagrun_dep.py > > > Key: AIRFLOW-1033 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1033 > Project: Apache Airflow > Issue Type: Bug > Components: DagRun >Affects Versions: 1.8.1 > Environment: Centos 7; > db: PostgreSQL 9.5 > python version: 2.7 > Installation via pip >Reporter: Bert Desmet >Priority: Blocker > Labels: bug, interval > Fix For: 1.8.1 > > Attachments: test_dag.py, test_dag.py.log > > > Dear, > When starting a specific new dag we get the following error: > [2017-03-23 16:51:16,354] {jobs.py:354} DagFileProcessor908 ERROR - Got an > exception! Propagating... > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 346, in helper > pickle_dags) > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1581, in > process_file > self._process_dags(dagbag, dags, ti_keys_to_schedule) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 1174, in > _process_dags > self._process_task_instances(dag, tis_out) > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 905, in > _process_task_instances > session=session): > File "/usr/lib/python2.7/site-packages/airflow/utils/db.py", line 53, in > wrapper > result = func(*args, **kwargs) > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1116, in > are_dependencies_met > session=session): > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1140, in > get_failed_dep_statuses > dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/base_ti_dep.py", line > 94, in get_dep_statuses > for dep_status in self._get_dep_statuses(ti, session, dep_context): > File > "/usr/lib/python2.7/site-packages/airflow/ti_deps/deps/prev_dagrun_dep.py", > line 47, in _get_dep_statuses > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > TypeError: can't compare datetime.datetime to NoneType > I have added some debug code to the file 'prev_dagrun_dep.py: > dag = ti.task.dag > print 'Start dates:' > print 'previous_exection_date: > %s'%(dag.previous_schedule(ti.execution_date)) > print 'current start date: %s'%(ti.task.start_date) > if dag.catchup: > if dag.previous_schedule(ti.execution_date) < ti.task.start_date: > And this is the output I get: > Start dates: > previous_exection_date: None > current start date: 2017-03-19 00:00:00 > I think it is normall that the previous_exection_date is null, since it is > the first time this dag is being run. But why is the start_date of the dag > important, and not the start date of the run? > I have the feeling the cause is the 'schedule_interval', which is set to > None. > Please find an example and it's log file as an attachment to this mail. > Bert -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags
Repository: incubator-airflow Updated Branches: refs/heads/master 4c41f6e96 -> fbcbd053a [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags DAGs that did not have a schedule (None or @once) make the dependency checker raise an exception as the previous schedule will not exist. Also activates all ti_deps tests. Closes #2220 from bolkedebruin/AIRFLOW-1033 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fbcbd053 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fbcbd053 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fbcbd053 Branch: refs/heads/master Commit: fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab Parents: 4c41f6e Author: Bolke de BruinAuthored: Thu Apr 6 14:03:11 2017 +0200 Committer: Bolke de Bruin Committed: Thu Apr 6 14:03:11 2017 +0200 -- airflow/ti_deps/deps/base_ti_dep.py | 14 +- airflow/ti_deps/deps/prev_dagrun_dep.py | 5 + .../ti_deps/deps/dag_ti_slots_available_dep.py | 41 --- tests/ti_deps/deps/dag_unpaused_dep.py | 41 --- tests/ti_deps/deps/dagrun_exists_dep.py | 41 --- tests/ti_deps/deps/not_in_retry_period_dep.py | 61 tests/ti_deps/deps/not_running_dep.py | 39 --- tests/ti_deps/deps/not_skipped_dep.py | 38 --- tests/ti_deps/deps/pool_has_space_dep.py| 37 --- tests/ti_deps/deps/prev_dagrun_dep.py | 143 - tests/ti_deps/deps/runnable_exec_date_dep.py| 92 -- .../deps/test_dag_ti_slots_available_dep.py | 42 +++ tests/ti_deps/deps/test_dag_unpaused_dep.py | 42 +++ tests/ti_deps/deps/test_dagrun_exists_dep.py| 40 +++ .../deps/test_not_in_retry_period_dep.py| 59 tests/ti_deps/deps/test_not_running_dep.py | 37 +++ tests/ti_deps/deps/test_not_skipped_dep.py | 36 +++ tests/ti_deps/deps/test_prev_dagrun_dep.py | 123 .../ti_deps/deps/test_runnable_exec_date_dep.py | 76 + tests/ti_deps/deps/test_trigger_rule_dep.py | 252 tests/ti_deps/deps/test_valid_state_dep.py | 46 +++ tests/ti_deps/deps/trigger_rule_dep.py | 295 --- tests/ti_deps/deps/valid_state_dep.py | 49 --- 23 files changed, 768 insertions(+), 881 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fbcbd053/airflow/ti_deps/deps/base_ti_dep.py -- diff --git a/airflow/ti_deps/deps/base_ti_dep.py b/airflow/ti_deps/deps/base_ti_dep.py index d735264..810852d 100644 --- a/airflow/ti_deps/deps/base_ti_dep.py +++ b/airflow/ti_deps/deps/base_ti_dep.py @@ -51,7 +51,7 @@ class BaseTIDep(object): """ return getattr(self, 'NAME', self.__class__.__name__) -def _get_dep_statuses(self, ti, session, dep_context): +def _get_dep_statuses(self, ti, session, dep_context=None): """ Abstract method that returns an iterable of TIDepStatus objects that describe whether the given task instance has this dependency met. @@ -69,7 +69,7 @@ class BaseTIDep(object): raise NotImplementedError @provide_session -def get_dep_statuses(self, ti, session, dep_context): +def get_dep_statuses(self, ti, session, dep_context=None): """ Wrapper around the private _get_dep_statuses method that contains some global checks for all dependencies. @@ -81,6 +81,12 @@ class BaseTIDep(object): :param dep_context: the context for which this dependency should be evaluated for :type dep_context: DepContext """ +# this avoids a circular dependency +from airflow.ti_deps.dep_context import DepContext + +if dep_context is None: +dep_context = DepContext() + if self.IGNOREABLE and dep_context.ignore_all_deps: yield self._passing_status( reason="Context specified all dependencies should be ignored.") @@ -95,7 +101,7 @@ class BaseTIDep(object): yield dep_status @provide_session -def is_met(self, ti, session, dep_context): +def is_met(self, ti, session, dep_context=None): """ Returns whether or not this dependency is met for a given task instance. A dependency is considered met if all of the dependency statuses it reports are @@ -113,7 +119,7 @@ class BaseTIDep(object): self.get_dep_statuses(ti, session, dep_context)) @provide_session -def get_failure_reasons(self, ti, session, dep_context): +def get_failure_reasons(self, ti, session, dep_context=None): """ Returns an iterable of strings that explain why this dependency wasn't
[jira] [Created] (AIRFLOW-1080) airflow_local_settings: no effect on task.template_fields changes in policy function
andrei created AIRFLOW-1080: --- Summary: airflow_local_settings: no effect on task.template_fields changes in policy function Key: AIRFLOW-1080 URL: https://issues.apache.org/jira/browse/AIRFLOW-1080 Project: Apache Airflow Issue Type: Bug Components: configuration Affects Versions: 1.9.0 Reporter: andrei Priority: Minor I am trying to add new template_fields to some operators. I have tried to do that using policy() function, but not succeed. Maybe it's wrong way to change existing operators? {quote} def policy(task): if task.__class__.__name__ == 'FileToGoogleCloudStorageOperator': print('I am working!') task.template_fields = ('bucket', ) task.resolve_template_files() task.render_templates() {quote} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-969] Catch bad python_callable argument
Repository: incubator-airflow Updated Branches: refs/heads/v1-8-test dff6d21bf -> 916741171 [AIRFLOW-969] Catch bad python_callable argument Checks for callable when Operator is created, not when it is run. * added initial PythonOperator unit test, testing run * python_callable must be callable; added unit test Closes #2142 from abloomston/python-callable (cherry picked from commit 12901ddfa9961a11feaa3f17696d19102ff8ecd0) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/91674117 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/91674117 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/91674117 Branch: refs/heads/v1-8-test Commit: 916741171cc0c6426dbcbe8a2b5ce2468fce870d Parents: dff6d21 Author: abloomston Authored: Thu Mar 16 19:36:00 2017 -0400 Committer: Bolke de Bruin Committed: Thu Apr 6 09:47:18 2017 +0200 -- airflow/operators/python_operator.py | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91674117/airflow/operators/python_operator.py -- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 114bc7e..cf240f2 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -16,6 +16,7 @@ from builtins import str from datetime import datetime import logging +from airflow.exceptions import AirflowException from airflow.models import BaseOperator, TaskInstance from airflow.utils.state import State from airflow.utils.decorators import apply_defaults @@ -63,6 +64,8 @@ class PythonOperator(BaseOperator): templates_exts=None, *args, **kwargs): super(PythonOperator, self).__init__(*args, **kwargs) +if not callable(python_callable): +raise AirflowException('`python_callable` param must be callable') self.python_callable = python_callable self.op_args = op_args or [] self.op_kwargs = op_kwargs or {}
[jira] [Commented] (AIRFLOW-969) Catch bad python_callable argument at DAG construction rather than Task run
[ https://issues.apache.org/jira/browse/AIRFLOW-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15958483#comment-15958483 ] ASF subversion and git services commented on AIRFLOW-969: - Commit 916741171cc0c6426dbcbe8a2b5ce2468fce870d in incubator-airflow's branch refs/heads/v1-8-test from abloomston [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=9167411 ] [AIRFLOW-969] Catch bad python_callable argument Checks for callable when Operator is created, not when it is run. * added initial PythonOperator unit test, testing run * python_callable must be callable; added unit test Closes #2142 from abloomston/python-callable (cherry picked from commit 12901ddfa9961a11feaa3f17696d19102ff8ecd0) Signed-off-by: Bolke de Bruin> Catch bad python_callable argument at DAG construction rather than Task run > --- > > Key: AIRFLOW-969 > URL: https://issues.apache.org/jira/browse/AIRFLOW-969 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Reporter: Adam Bloomston >Assignee: Adam Bloomston >Priority: Minor > Fix For: 1.9.0 > > > If a non-callable parameter for python_callable is passed to PythonOperator, > it should fail to instantiate. This will move such failures from task run to > DAG instantiation. Better to catch such errors sooner rather than later in > execution. -- This message was sent by Atlassian JIRA (v6.3.15#6346)