Repository: incubator-airflow Updated Branches: refs/heads/master c37740f53 -> eb5982d4a
[AIRFLOW-333][AIRFLOW-258] Fix non-module plugin components * Distinguish between module and non-module plugin components * Fix handling of non-module plugin components * admin views, flask blueprints, and menu links need to not be wrapped in modules * Fix improper use of zope.deprecation.deprecated * zope.deprecation.deprecated does NOT support classes as first parameter * deprecating classes must be handled by calling the deprecate function on the class name * Added tests for plugin loading * Updated plugin documentation to match test plugin * Updated executors to always load plugins * More logging Closes #1738 from gwax/plugin_module_fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb5982d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb5982d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb5982d4 Branch: refs/heads/master Commit: eb5982d4aac135051666d2977bbf6adfc8b9e2a7 Parents: c37740f Author: George Leslie-Waksman <geo...@cloverhealth.com> Authored: Sat Oct 1 23:43:20 2016 -0700 Committer: Siddharth Anand <siddharthan...@yahoo.com> Committed: Sat Oct 1 23:43:20 2016 -0700 ---------------------------------------------------------------------- airflow/__init__.py | 1 + airflow/configuration.py | 11 ++++++ airflow/executors/__init__.py | 13 +++++-- airflow/hooks/__init__.py | 19 +++++----- airflow/macros/__init__.py | 18 +++++----- airflow/operators/__init__.py | 18 +++++----- airflow/plugins_manager.py | 37 ++++++++++++-------- airflow/www/app.py | 4 +++ docs/plugins.rst | 17 +++++---- tests/plugins/test_plugin.py | 72 ++++++++++++++++++++++++++++++++++++++ tests/plugins_manager.py | 70 ++++++++++++++++++++++++++++++++++++ 11 files changed, 232 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/__init__.py b/airflow/__init__.py index a8744da..1e40fe9 100644 --- a/airflow/__init__.py +++ b/airflow/__init__.py @@ -82,4 +82,5 @@ from airflow import contrib operators._integrate_plugins() hooks._integrate_plugins() +executors._integrate_plugins() macros._integrate_plugins() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index 09b1b61..c21ae73 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -402,6 +402,7 @@ TEST_CONFIG = """\ unit_test_mode = True airflow_home = {AIRFLOW_HOME} dags_folder = {TEST_DAGS_FOLDER} +plugins_folder = {TEST_PLUGINS_FOLDER} base_log_folder = {AIRFLOW_HOME}/logs executor = SequentialExecutor sql_alchemy_conn = sqlite:///{AIRFLOW_HOME}/unittests.db @@ -683,6 +684,16 @@ if os.path.exists(_TEST_DAGS_FOLDER): else: TEST_DAGS_FOLDER = os.path.join(AIRFLOW_HOME, 'dags') +# Set up plugins folder for unit tests +_TEST_PLUGINS_FOLDER = os.path.join( + os.path.dirname(os.path.dirname(os.path.realpath(__file__))), + 'tests', + 'plugins') +if os.path.exists(_TEST_PLUGINS_FOLDER): + TEST_PLUGINS_FOLDER = _TEST_PLUGINS_FOLDER +else: + TEST_PLUGINS_FOLDER = os.path.join(AIRFLOW_HOME, 'plugins') + def parameterized_config(template): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/executors/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/executors/__init__.py b/airflow/executors/__init__.py index 9396198..77f139e 100644 --- a/airflow/executors/__init__.py +++ b/airflow/executors/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import sys from airflow import configuration from airflow.executors.base_executor import BaseExecutor @@ -26,6 +27,14 @@ except: from airflow.exceptions import AirflowException + +def _integrate_plugins(): + """Integrate plugins to the context.""" + from airflow.plugins_manager import executors_modules + for executors_module in executors_modules: + sys.modules[executors_module.__name__] = executors_module + globals()[executors_module._name] = executors_module + _EXECUTOR = configuration.get('core', 'EXECUTOR') if _EXECUTOR == 'LocalExecutor': @@ -39,9 +48,7 @@ elif _EXECUTOR == 'MesosExecutor': DEFAULT_EXECUTOR = MesosExecutor() else: # Loading plugins - from airflow.plugins_manager import executors as _executors - for _executor in _executors: - globals()[_executor.__name__] = _executor + _integrate_plugins() if _EXECUTOR in globals(): DEFAULT_EXECUTOR = globals()[_EXECUTOR]() else: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/hooks/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/hooks/__init__.py b/airflow/hooks/__init__.py index 8942bff..cc09f5a 100644 --- a/airflow/hooks/__init__.py +++ b/airflow/hooks/__init__.py @@ -64,21 +64,24 @@ if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): def _integrate_plugins(): """Integrate plugins to the context""" - from airflow.plugins_manager import hooks as _hooks - for _hook_module in _hooks: - sys.modules[_hook_module.__name__] = _hook_module - globals()[_hook_module._name] = _hook_module + from airflow.plugins_manager import hooks_modules + for hooks_module in hooks_modules: + sys.modules[hooks_module.__name__] = hooks_module + globals()[hooks_module._name] = hooks_module ########################################################## # TODO FIXME Remove in Airflow 2.0 if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): from zope.deprecation import deprecated as _deprecated - for _hook in _hook_module._objects: - globals()[_hook.__name__] = _deprecated( - _hook, + for _hook in hooks_module._objects: + hook_name = _hook.__name__ + globals()[hook_name] = _hook + _deprecated( + hook_name, "Importing plugin hook '{i}' directly from " "'airflow.hooks' has been deprecated. Please " "import from 'airflow.hooks.[plugin_module]' " "instead. Support for direct imports will be dropped " - "entirely in Airflow 2.0.".format(i=_hook)) + "entirely in Airflow 2.0.".format(i=hook_name)) + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/macros/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/macros/__init__.py b/airflow/macros/__init__.py index 0e1ba70..59b9a25 100644 --- a/airflow/macros/__init__.py +++ b/airflow/macros/__init__.py @@ -65,10 +65,10 @@ def ds_format(ds, input_format, output_format): def _integrate_plugins(): """Integrate plugins to the context""" import sys - from airflow.plugins_manager import macros as _macros - for _macro_module in _macros: - sys.modules[_macro_module.__name__] = _macro_module - globals()[_macro_module._name] = _macro_module + from airflow.plugins_manager import macros_modules + for macros_module in macros_modules: + sys.modules[macros_module.__name__] = macros_module + globals()[macros_module._name] = macros_module ########################################################## # TODO FIXME Remove in Airflow 2.0 @@ -76,11 +76,13 @@ def _integrate_plugins(): import os as _os if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): from zope.deprecation import deprecated as _deprecated - for _macro in _macro_module._objects: - globals()[_macro.__name__] = _deprecated( - _macro, + for _macro in macros_module._objects: + macro_name = _macro.__name__ + globals()[macro_name] = _macro + _deprecated( + macro_name, "Importing plugin macro '{i}' directly from " "'airflow.macros' has been deprecated. Please " "import from 'airflow.macros.[plugin_module]' " "instead. Support for direct imports will be dropped " - "entirely in Airflow 2.0.".format(i=_macro)) + "entirely in Airflow 2.0.".format(i=macro_name)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/operators/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/operators/__init__.py b/airflow/operators/__init__.py index 4cfac7b..50b05ff 100644 --- a/airflow/operators/__init__.py +++ b/airflow/operators/__init__.py @@ -101,21 +101,23 @@ if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): def _integrate_plugins(): """Integrate plugins to the context""" - from airflow.plugins_manager import operators as _operators - for _operator_module in _operators: - sys.modules[_operator_module.__name__] = _operator_module - globals()[_operator_module._name] = _operator_module + from airflow.plugins_manager import operators_modules + for operators_module in operators_modules: + sys.modules[operators_module.__name__] = operators_module + globals()[operators_module._name] = operators_module ########################################################## # TODO FIXME Remove in Airflow 2.0 if not _os.environ.get('AIRFLOW_USE_NEW_IMPORTS', False): from zope.deprecation import deprecated as _deprecated - for _operator in _operator_module._objects: - globals()[_operator.__name__] = _deprecated( - _operator, + for _operator in operators_module._objects: + operator_name = _operator.__name__ + globals()[operator_name] = _operator + _deprecated( + operator_name, "Importing plugin operator '{i}' directly from " "'airflow.operators' has been deprecated. Please " "import from 'airflow.operators.[plugin_module]' " "instead. Support for direct imports will be dropped " - "entirely in Airflow 2.0.".format(i=_operator)) + "entirely in Airflow 2.0.".format(i=operator_name)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/plugins_manager.py ---------------------------------------------------------------------- diff --git a/airflow/plugins_manager.py b/airflow/plugins_manager.py index 940aa87..e0af20c 100644 --- a/airflow/plugins_manager.py +++ b/airflow/plugins_manager.py @@ -24,8 +24,6 @@ import logging import os import re import sys -from itertools import chain -merge = chain.from_iterable from airflow import configuration @@ -74,6 +72,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True): if file_ext != '.py': continue + logging.info('Importing plugin module ' + filepath) # normalize root path as namespace namespace = '_'.join([re.sub(norm_pattern, '__', root), mod_name]) @@ -93,6 +92,7 @@ for root, dirs, files in os.walk(plugins_folder, followlinks=True): def make_module(name, objects): + logging.info('Creating module ' + name) name = name.lower() module = imp.new_module(name) module._name = name.split('.')[-1] @@ -100,18 +100,25 @@ def make_module(name, objects): module.__dict__.update((o.__name__, o) for o in objects) return module -operators, hooks, executors, macros, admin_views = [], [], [], [], [] -flask_blueprints, menu_links = [], [] +# Plugin components to integrate as modules +operators_modules = [] +hooks_modules = [] +executors_modules = [] +macros_modules = [] + +# Plugin components to integrate directly +admin_views = [] +flask_blueprints = [] +menu_links = [] for p in plugins: - operators.append(make_module('airflow.operators.' + p.name, p.operators)) - hooks.append(make_module('airflow.hooks.' + p.name, p.hooks)) - executors.append(make_module('airflow.executors.' + p.name, p.executors)) - macros.append(make_module('airflow.macros.' + p.name, p.macros)) - admin_views.append( - make_module('airflow.www.admin_views' + p.name, p.admin_views)) - flask_blueprints.append( - make_module( - 'airflow.www.flask_blueprints' + p.name, p.flask_blueprints)) - menu_links.append( - make_module('airflow.www.menu_links' + p.name, p.menu_links)) + operators_modules.append( + make_module('airflow.operators.' + p.name, p.operators)) + hooks_modules.append(make_module('airflow.hooks.' + p.name, p.hooks)) + executors_modules.append( + make_module('airflow.executors.' + p.name, p.executors)) + macros_modules.append(make_module('airflow.macros.' + p.name, p.macros)) + + admin_views.extend(p.admin_views) + flask_blueprints.extend(p.flask_blueprints) + menu_links.extend(p.menu_links) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/airflow/www/app.py ---------------------------------------------------------------------- diff --git a/airflow/www/app.py b/airflow/www/app.py index 10d8420..c4dff6f 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +import logging import socket from flask import Flask @@ -109,10 +110,13 @@ def create_app(config=None): from airflow.plugins_manager import ( admin_views, flask_blueprints, menu_links) for v in admin_views: + logging.info('Adding view ' + v.name) admin.add_view(v) for bp in flask_blueprints: + logging.info('Adding blueprint ' + bp.name) app.register_blueprint(bp) for ml in sorted(menu_links, key=lambda x: x.name): + logging.info('Adding menu link ' + ml.name) admin.add_link(ml) integrate_plugins() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/docs/plugins.rst ---------------------------------------------------------------------- diff --git a/docs/plugins.rst b/docs/plugins.rst index 5dde383..8d2078f 100644 --- a/docs/plugins.rst +++ b/docs/plugins.rst @@ -96,18 +96,22 @@ definitions in Airflow. from airflow.models import BaseOperator from airflow.executors.base_executor import BaseExecutor - # Will show up under airflow.hooks.PluginHook + # Will show up under airflow.hooks.test_plugin.PluginHook class PluginHook(BaseHook): pass - # Will show up under airflow.operators.PluginOperator + # Will show up under airflow.operators.test_plugin.PluginOperator class PluginOperator(BaseOperator): pass - # Will show up under airflow.executors.PluginExecutor + # Will show up under airflow.executors.test_plugin.PluginExecutor class PluginExecutor(BaseExecutor): pass + # Will show up under airflow.macros.test_plugin.plugin_macro + def plugin_macro(): + pass + # Creating a flask admin BaseView class TestView(BaseView): @expose('/') @@ -119,10 +123,10 @@ definitions in Airflow. # Creating a flask blueprint to intergrate the templates and static folder bp = Blueprint( "test_plugin", __name__, - template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder + template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder static_folder='static', static_url_path='/static/test_plugin') - + ml = MenuLink( category='Test Plugin', name='Test Menu Link', @@ -132,8 +136,9 @@ definitions in Airflow. class AirflowTestPlugin(AirflowPlugin): name = "test_plugin" operators = [PluginOperator] - flask_blueprints = [bp] hooks = [PluginHook] executors = [PluginExecutor] + macros = [plugin_macro] admin_views = [v] + flask_blueprints = [bp] menu_links = [ml] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/tests/plugins/test_plugin.py ---------------------------------------------------------------------- diff --git a/tests/plugins/test_plugin.py b/tests/plugins/test_plugin.py new file mode 100644 index 0000000..e628919 --- /dev/null +++ b/tests/plugins/test_plugin.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This is the class you derive to create a plugin +from airflow.plugins_manager import AirflowPlugin + +from flask import Blueprint +from flask_admin import BaseView, expose +from flask_admin.base import MenuLink + +# Importing base classes that we need to derive +from airflow.hooks.base_hook import BaseHook +from airflow.models import BaseOperator +from airflow.executors.base_executor import BaseExecutor + +# Will show up under airflow.hooks.test_plugin.PluginHook +class PluginHook(BaseHook): + pass + +# Will show up under airflow.operators.test_plugin.PluginOperator +class PluginOperator(BaseOperator): + pass + +# Will show up under airflow.executors.test_plugin.PluginExecutor +class PluginExecutor(BaseExecutor): + pass + +# Will show up under airflow.macros.test_plugin.plugin_macro +def plugin_macro(): + pass + +# Creating a flask admin BaseView +class TestView(BaseView): + @expose('/') + def test(self): + # in this example, put your test_plugin/test.html template at airflow/plugins/templates/test_plugin/test.html + return self.render("test_plugin/test.html", content="Hello galaxy!") +v = TestView(category="Test Plugin", name="Test View") + +# Creating a flask blueprint to intergrate the templates and static folder +bp = Blueprint( + "test_plugin", __name__, + template_folder='templates', # registers airflow/plugins/templates as a Jinja template folder + static_folder='static', + static_url_path='/static/test_plugin') + +ml = MenuLink( + category='Test Plugin', + name='Test Menu Link', + url='http://pythonhosted.org/airflow/') + +# Defining the plugin class +class AirflowTestPlugin(AirflowPlugin): + name = "test_plugin" + operators = [PluginOperator] + hooks = [PluginHook] + executors = [PluginExecutor] + macros = [plugin_macro] + admin_views = [v] + flask_blueprints = [bp] + menu_links = [ml] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb5982d4/tests/plugins_manager.py ---------------------------------------------------------------------- diff --git a/tests/plugins_manager.py b/tests/plugins_manager.py new file mode 100644 index 0000000..0012cdf --- /dev/null +++ b/tests/plugins_manager.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import inspect +import logging +import unittest + +from flask.blueprints import Blueprint +from flask_admin import BaseView +from flask_admin.menu import MenuLink, MenuView + +from airflow.hooks.base_hook import BaseHook +from airflow.models import BaseOperator +from airflow.executors.base_executor import BaseExecutor +from airflow.www.app import cached_app + + +class PluginsTest(unittest.TestCase): + + def test_operators(self): + from airflow.operators.test_plugin import PluginOperator + assert issubclass(PluginOperator, BaseOperator) + + def test_hooks(self): + from airflow.hooks.test_plugin import PluginHook + assert issubclass(PluginHook, BaseHook) + + def test_executors(self): + from airflow.executors.test_plugin import PluginExecutor + assert issubclass(PluginExecutor, BaseExecutor) + + def test_macros(self): + from airflow.macros.test_plugin import plugin_macro + assert callable(plugin_macro) + + def test_admin_views(self): + app = cached_app() + [admin] = app.extensions['admin'] + category = admin._menu_categories['Test Plugin'] + [admin_view] = [v for v in category.get_children() + if isinstance(v, MenuView)] + assert admin_view.name == 'Test View' + + def test_flask_blueprints(self): + app = cached_app() + assert isinstance(app.blueprints['test_plugin'], Blueprint) + + def test_menu_links(self): + app = cached_app() + [admin] = app.extensions['admin'] + category = admin._menu_categories['Test Plugin'] + [menu_link] = [ml for ml in category.get_children() + if isinstance(ml, MenuLink)] + assert menu_link.name == 'Test Menu Link'