Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test c1264e714 -> 92e2ea60f


[AIRFLOW-74] SubdagOperators can consume all celeryd worker processes

Closes #3251 from feng-tao/airflow-74

(cherry picked from commit 64d950166773749c0e4aa0d7032b080cadd56a53)
Signed-off-by: Bolke de Bruin <bo...@xs4all.nl>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d35902cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d35902cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d35902cb

Branch: refs/heads/v1-10-test
Commit: d35902cb4a20afcee977327ff550e540eada5e4e
Parents: c1264e7
Author: Tao feng <tf...@lyft.com>
Authored: Tue Apr 24 10:13:25 2018 -0700
Committer: Bolke de Bruin <bo...@xs4all.nl>
Committed: Wed Apr 25 12:22:04 2018 +0200

----------------------------------------------------------------------
 UPDATING.md                          |  2 ++
 airflow/operators/subdag_operator.py | 22 ++++++++++++++--------
 tests/operators/subdag_operator.py   | 19 +++++++++++++------
 3 files changed, 29 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d35902cb/UPDATING.md
----------------------------------------------------------------------
diff --git a/UPDATING.md b/UPDATING.md
index 881539f..609c8db 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -5,6 +5,8 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Default executor for SubDagOperator is changed to SequentialExecutor
+
 ### New Webserver UI with Role-Based Access Control
 
 The current webserver UI uses the Flask-Admin extension. The new webserver UI 
uses the [Flask-AppBuilder (FAB)](https://github.com/dpgaspar/Flask-AppBuilder) 
extension. FAB has built-in authentication support and Role-Based Access 
Control (RBAC), which provides configurable roles and permissions for 
individual users.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d35902cb/airflow/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/airflow/operators/subdag_operator.py 
b/airflow/operators/subdag_operator.py
index c3c7591..052095e 100644
--- a/airflow/operators/subdag_operator.py
+++ b/airflow/operators/subdag_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,10 +18,10 @@
 # under the License.
 
 from airflow.exceptions import AirflowException
+from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.models import BaseOperator, Pool
 from airflow.utils.decorators import apply_defaults
 from airflow.utils.db import provide_session
-from airflow.executors import GetDefaultExecutor
 
 
 class SubDagOperator(BaseOperator):
@@ -35,16 +35,19 @@ class SubDagOperator(BaseOperator):
     def __init__(
             self,
             subdag,
-            executor=GetDefaultExecutor(),
+            executor=SequentialExecutor(),
             *args, **kwargs):
         """
-        Yo dawg. This runs a sub dag. By convention, a sub dag's dag_id
+        This runs a sub dag. By convention, a sub dag's dag_id
         should be prefixed by its parent and a dot. As in `parent.child`.
 
         :param subdag: the DAG object to run as a subdag of the current DAG.
-        :type subdag: airflow.DAG
-        :param dag: the parent DAG
-        :type subdag: airflow.DAG
+        :type subdag: airflow.DAG.
+        :param dag: the parent DAG for the subdag.
+        :type dag: airflow.DAG.
+        :param executor: the executor for this subdag. Default to use 
SequentialExecutor.
+                         Please find AIRFLOW-74 for more details.
+        :type executor: airflow.executors.
         """
         import airflow.models
         dag = kwargs.get('dag') or airflow.models._CONTEXT_MANAGER_DAG
@@ -88,6 +91,9 @@ class SubDagOperator(BaseOperator):
                     )
 
         self.subdag = subdag
+        # Airflow pool is not honored by SubDagOperator.
+        # Hence resources could be consumed by SubdagOperators
+        # Use other executor with your own risk.
         self.executor = executor
 
     def execute(self, context):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d35902cb/tests/operators/subdag_operator.py
----------------------------------------------------------------------
diff --git a/tests/operators/subdag_operator.py 
b/tests/operators/subdag_operator.py
index 5b51f1c..af47c5c 100644
--- a/tests/operators/subdag_operator.py
+++ b/tests/operators/subdag_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -17,18 +17,16 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import os
 import unittest
 
 from mock import Mock
 
 import airflow
+from airflow.exceptions import AirflowException
+from airflow.executors.sequential_executor import SequentialExecutor
 from airflow.models import DAG, DagBag
-from airflow.operators.bash_operator import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
 from airflow.operators.subdag_operator import SubDagOperator
-from airflow.jobs import BackfillJob
-from airflow.exceptions import AirflowException
 from airflow.utils.timezone import datetime
 
 DEFAULT_DATE = datetime(2016, 1, 1)
@@ -143,3 +141,12 @@ class SubDagOperatorTests(unittest.TestCase):
 
         # now make sure dag picks up the subdag error
         self.assertRaises(AirflowException, dag.run, start_date=DEFAULT_DATE, 
end_date=DEFAULT_DATE)
+
+    def test_subdag_executor(self):
+        """
+        Test default subdag executor is SequentialExecutor
+        """
+        dag = DAG('parent', default_args=default_args)
+        subdag_good = DAG('parent.test', default_args=default_args)
+        subdag = SubDagOperator(task_id='test', dag=dag, subdag=subdag_good)
+        self.assertEqual(type(subdag.executor), SequentialExecutor)

Reply via email to