Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test c1264e714 -> 92e2ea60f
[AIRFLOW-74] SubdagOperators can consume all celeryd worker processes Closes #3251 from feng-tao/airflow-74 (cherry picked from commit 64d950166773749c0e4aa0d7032b080cadd56a53) Signed-off-by: Bolke de Bruin <bo...@xs4all.nl> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d35902cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d35902cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d35902cb Branch: refs/heads/v1-10-test Commit: d35902cb4a20afcee977327ff550e540eada5e4e Parents: c1264e7 Author: Tao feng <tf...@lyft.com> Authored: Tue Apr 24 10:13:25 2018 -0700 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Wed Apr 25 12:22:04 2018 +0200 ---------------------------------------------------------------------- UPDATING.md | 2 ++ airflow/operators/subdag_operator.py | 22 ++++++++++++++-------- tests/operators/subdag_operator.py | 19 +++++++++++++------ 3 files changed, 29 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d35902cb/UPDATING.md ---------------------------------------------------------------------- diff --git a/UPDATING.md b/UPDATING.md index 881539f..609c8db 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -5,6 +5,8 @@ assists users migrating to a new version. ## Airflow Master +### Default executor for SubDagOperator is changed to SequentialExecutor + ### New Webserver UI with Role-Based Access Control The current webserver UI uses the Flask-Admin extension. The new webserver UI uses the [Flask-AppBuilder (FAB)](https://github.com/dpgaspar/Flask-AppBuilder) extension. FAB has built-in authentication support and Role-Based Access Control (RBAC), which provides configurable roles and permissions for individual users. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d35902cb/airflow/operators/subdag_operator.py ---------------------------------------------------------------------- diff --git a/airflow/operators/subdag_operator.py b/airflow/operators/subdag_operator.py index c3c7591..052095e 100644 --- a/airflow/operators/subdag_operator.py +++ b/airflow/operators/subdag_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -18,10 +18,10 @@ # under the License. from airflow.exceptions import AirflowException +from airflow.executors.sequential_executor import SequentialExecutor from airflow.models import BaseOperator, Pool from airflow.utils.decorators import apply_defaults from airflow.utils.db import provide_session -from airflow.executors import GetDefaultExecutor class SubDagOperator(BaseOperator): @@ -35,16 +35,19 @@ class SubDagOperator(BaseOperator): def __init__( self, subdag, - executor=GetDefaultExecutor(), + executor=SequentialExecutor(), *args, **kwargs): """ - Yo dawg. This runs a sub dag. By convention, a sub dag's dag_id + This runs a sub dag. By convention, a sub dag's dag_id should be prefixed by its parent and a dot. As in `parent.child`. :param subdag: the DAG object to run as a subdag of the current DAG. - :type subdag: airflow.DAG - :param dag: the parent DAG - :type subdag: airflow.DAG + :type subdag: airflow.DAG. + :param dag: the parent DAG for the subdag. + :type dag: airflow.DAG. + :param executor: the executor for this subdag. Default to use SequentialExecutor. + Please find AIRFLOW-74 for more details. + :type executor: airflow.executors. """ import airflow.models dag = kwargs.get('dag') or airflow.models._CONTEXT_MANAGER_DAG @@ -88,6 +91,9 @@ class SubDagOperator(BaseOperator): ) self.subdag = subdag + # Airflow pool is not honored by SubDagOperator. + # Hence resources could be consumed by SubdagOperators + # Use other executor with your own risk. self.executor = executor def execute(self, context): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d35902cb/tests/operators/subdag_operator.py ---------------------------------------------------------------------- diff --git a/tests/operators/subdag_operator.py b/tests/operators/subdag_operator.py index 5b51f1c..af47c5c 100644 --- a/tests/operators/subdag_operator.py +++ b/tests/operators/subdag_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -17,18 +17,16 @@ # specific language governing permissions and limitations # under the License. -import os import unittest from mock import Mock import airflow +from airflow.exceptions import AirflowException +from airflow.executors.sequential_executor import SequentialExecutor from airflow.models import DAG, DagBag -from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.subdag_operator import SubDagOperator -from airflow.jobs import BackfillJob -from airflow.exceptions import AirflowException from airflow.utils.timezone import datetime DEFAULT_DATE = datetime(2016, 1, 1) @@ -143,3 +141,12 @@ class SubDagOperatorTests(unittest.TestCase): # now make sure dag picks up the subdag error self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) + + def test_subdag_executor(self): + """ + Test default subdag executor is SequentialExecutor + """ + dag = DAG('parent', default_args=default_args) + subdag_good = DAG('parent.test', default_args=default_args) + subdag = SubDagOperator(task_id='test', dag=dag, subdag=subdag_good) + self.assertEqual(type(subdag.executor), SequentialExecutor)