Repository: incubator-airflow Updated Branches: refs/heads/master a6487d619 -> 9dba430b6
[AIRFLOW-2253] Add Airflow CLI instrumentation Closes #3159 from jinhyukchang/cli-instrumentation Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9dba430b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9dba430b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9dba430b Branch: refs/heads/master Commit: 9dba430b683361fc0ed7f50de6daa03c971a476b Parents: a6487d6 Author: Jin Hyuk Chang <jinch...@lyft.com> Authored: Mon Apr 2 15:16:25 2018 -0700 Committer: Maxime Beauchemin <maximebeauche...@gmail.com> Committed: Mon Apr 2 15:16:25 2018 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 30 ++++++++ airflow/settings.py | 11 ++- airflow/utils/cli.py | 114 +++++++++++++++++++++++++++++++ airflow/utils/cli_action_loggers.py | 101 +++++++++++++++++++++++++++ tests/utils/test_cli_util.py | 97 ++++++++++++++++++++++++++ 5 files changed, 352 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 24f5d31..eb96e77 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -54,6 +54,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance, Connection, DAG) from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS) +from airflow.utils import cli as cli_utils from airflow.utils import db as db_utils from airflow.utils.net import get_hostname from airflow.utils.log.logging_mixin import (LoggingMixin, redirect_stderr, @@ -149,6 +150,7 @@ def get_dags(args): return matched_dags +@cli_utils.action_logging def backfill(args, dag=None): logging.basicConfig( level=settings.LOGGING_LEVEL, @@ -190,6 +192,7 @@ def backfill(args, dag=None): delay_on_limit_secs=args.delay_on_limit) +@cli_utils.action_logging def trigger_dag(args): """ Creates a dag run for the specified dag @@ -208,6 +211,7 @@ def trigger_dag(args): log.info(message) +@cli_utils.action_logging def delete_dag(args): """ Deletes all DB records related to the specified dag @@ -228,6 +232,7 @@ def delete_dag(args): print("Bail.") +@cli_utils.action_logging def pool(args): log = LoggingMixin().log @@ -252,6 +257,7 @@ def pool(args): log.info(_tabulate(pools=pools)) +@cli_utils.action_logging def variables(args): if args.get: try: @@ -328,10 +334,12 @@ def export_helper(filepath): print("{} variables successfully exported to {}".format(len(var_dict), filepath)) +@cli_utils.action_logging def pause(args, dag=None): set_is_paused(True, args, dag) +@cli_utils.action_logging def unpause(args, dag=None): set_is_paused(False, args, dag) @@ -401,6 +409,7 @@ def _run(args, dag, ti): executor.end() +@cli_utils.action_logging def run(args, dag=None): # Disable connection pooling to reduce the # of connections on the DB # while it's waiting for the task to finish. @@ -453,6 +462,8 @@ def run(args, dag=None): _run(args, dag, ti) logging.shutdown() + +@cli_utils.action_logging def task_failed_deps(args): """ Returns the unmet dependencies for a task instance from the perspective of the @@ -479,6 +490,7 @@ def task_failed_deps(args): print("Task instance dependencies are all met.") +@cli_utils.action_logging def task_state(args): """ Returns the state of a TaskInstance at the command line. @@ -492,6 +504,7 @@ def task_state(args): print(ti.current_state()) +@cli_utils.action_logging def dag_state(args): """ Returns the state of a DagRun at the command line. @@ -504,6 +517,7 @@ def dag_state(args): print(dr[0].state if len(dr) > 0 else None) +@cli_utils.action_logging def list_dags(args): dagbag = DagBag(process_subdir(args.subdir)) s = textwrap.dedent("""\n @@ -518,6 +532,7 @@ def list_dags(args): print(dagbag.dagbag_report()) +@cli_utils.action_logging def list_tasks(args, dag=None): dag = dag or get_dag(args) if args.tree: @@ -527,6 +542,7 @@ def list_tasks(args, dag=None): print("\n".join(sorted(tasks))) +@cli_utils.action_logging def test(args, dag=None): dag = dag or get_dag(args) @@ -543,6 +559,7 @@ def test(args, dag=None): ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) +@cli_utils.action_logging def render(args): dag = get_dag(args) task = dag.get_task(task_id=args.task_id) @@ -557,6 +574,7 @@ def render(args): """.format(attr, getattr(task, attr)))) +@cli_utils.action_logging def clear(args): logging.basicConfig( level=settings.LOGGING_LEVEL, @@ -713,6 +731,7 @@ def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout): sys.exit(1) +@cli_utils.action_logging def webserver(args): print(settings.HEADER) @@ -838,6 +857,7 @@ def webserver(args): monitor_gunicorn(gunicorn_master_proc) +@cli_utils.action_logging def scheduler(args): print(settings.HEADER) job = jobs.SchedulerJob( @@ -871,6 +891,7 @@ def scheduler(args): job.run() +@cli_utils.action_logging def serve_logs(args): print("Starting flask") import flask @@ -891,6 +912,7 @@ def serve_logs(args): host='0.0.0.0', port=WORKER_LOG_SERVER_PORT) +@cli_utils.action_logging def worker(args): env = os.environ.copy() env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME @@ -937,12 +959,14 @@ def worker(args): sp.kill() +@cli_utils.action_logging def initdb(args): # noqa print("DB: " + repr(settings.engine.url)) db_utils.initdb(settings.RBAC) print("Done.") +@cli_utils.action_logging def resetdb(args): print("DB: " + repr(settings.engine.url)) if args.yes or input( @@ -953,6 +977,7 @@ def resetdb(args): print("Bail.") +@cli_utils.action_logging def upgradedb(args): # noqa print("DB: " + repr(settings.engine.url)) db_utils.upgradedb() @@ -970,6 +995,7 @@ def upgradedb(args): # noqa session.commit() +@cli_utils.action_logging def version(args): # noqa print(settings.HEADER + " v" + airflow.__version__) @@ -978,6 +1004,7 @@ alternative_conn_specs = ['conn_type', 'conn_host', 'conn_login', 'conn_password', 'conn_schema', 'conn_port'] +@cli_utils.action_logging def connections(args): if args.list: # Check that no other flags were passed to the command @@ -1098,6 +1125,7 @@ def connections(args): return +@cli_utils.action_logging def flower(args): broka = conf.get('celery', 'BROKER_URL') address = '--address={}'.format(args.hostname) @@ -1139,6 +1167,7 @@ def flower(args): broka, address, port, api, flower_conf, url_prefix]) +@cli_utils.action_logging def kerberos(args): # noqa print(settings.HEADER) import airflow.security.kerberos @@ -1167,6 +1196,7 @@ def kerberos(args): # noqa airflow.security.kerberos.run() +@cli_utils.action_logging def create_user(args): fields = { 'role': args.role, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 06748ce..1abafa4 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -188,9 +188,17 @@ def configure_adapters(): pass +def configure_action_logging(): + """ + Any additional configuration (register callback) for airflow.utils.action_loggers + module + :return: None + """ + pass + + try: from airflow_local_settings import * - log.info("Loaded airflow_local_settings.") except: pass @@ -199,6 +207,7 @@ configure_logging() configure_vars() configure_adapters() configure_orm() +configure_action_logging() # Ensure we close DB connections at scheduler and gunicon worker terminations atexit.register(dispose_orm) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/airflow/utils/cli.py ---------------------------------------------------------------------- diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py new file mode 100644 index 0000000..5f71bca --- /dev/null +++ b/airflow/utils/cli.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +Utilities module for cli +""" +from __future__ import absolute_import + +import functools +import getpass +import json +import socket +import sys +from argparse import Namespace +from datetime import datetime + +import airflow.models +from airflow.utils import cli_action_loggers + + +def action_logging(f): + """ + Decorates function to execute function at the same time submitting action_logging + but in CLI context. It will call action logger callbacks twice, + one for pre-execution and the other one for post-execution. + + Action logger will be called with below keyword parameters: + sub_command : name of sub-command + start_datetime : start datetime instance by utc + end_datetime : end datetime instance by utc + full_command : full command line arguments + user : current user + log : airflow.models.Log ORM instance + dag_id : dag id (optional) + task_id : task_id (optional) + execution_date : execution date (optional) + error : exception instance if there's an exception + + :param f: function instance + :return: wrapped function + """ + @functools.wraps(f) + def wrapper(*args, **kwargs): + """ + An wrapper for cli functions. It assumes to have Namespace instance + at 1st positional argument + :param args: Positional argument. It assumes to have Namespace instance + at 1st positional argument + :param kwargs: A passthrough keyword argument + """ + assert args + assert isinstance(args[0], Namespace), \ + "1st positional argument should be argparse.Namespace instance, " \ + "but {}".format(args[0]) + metrics = _build_metrics(f.__name__, args[0]) + cli_action_loggers.on_pre_execution(**metrics) + try: + return f(*args, **kwargs) + except Exception as e: + metrics['error'] = e + raise e + finally: + metrics['end_datetime'] = datetime.utcnow() + cli_action_loggers.on_post_execution(**metrics) + + return wrapper + + +def _build_metrics(func_name, namespace): + """ + Builds metrics dict from function args + It assumes that function arguments is from airflow.bin.cli module's function + and has Namespace instance where it optionally contains "dag_id", "task_id", + and "execution_date". + + :param func_name: name of function + :param namespace: Namespace instance from argparse + :return: dict with metrics + """ + + metrics = {'sub_command': func_name} + metrics['start_datetime'] = datetime.utcnow() + metrics['full_command'] = '{}'.format(list(sys.argv)) + metrics['user'] = getpass.getuser() + + assert isinstance(namespace, Namespace) + tmp_dic = vars(namespace) + metrics['dag_id'] = tmp_dic.get('dag_id') + metrics['task_id'] = tmp_dic.get('task_id') + metrics['execution_date'] = tmp_dic.get('execution_date') + metrics['host_name'] = socket.gethostname() + + extra = json.dumps(dict((k, metrics[k]) for k in ('host_name', 'full_command'))) + log = airflow.models.Log( + event='cli_{}'.format(func_name), + task_instance=None, + owner=metrics['user'], + extra=extra, + task_id=metrics.get('task_id'), + dag_id=metrics.get('dag_id'), + execution_date=metrics.get('execution_date')) + metrics['log'] = log + return metrics http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/airflow/utils/cli_action_loggers.py ---------------------------------------------------------------------- diff --git a/airflow/utils/cli_action_loggers.py b/airflow/utils/cli_action_loggers.py new file mode 100644 index 0000000..1bc6865 --- /dev/null +++ b/airflow/utils/cli_action_loggers.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +""" +An Action Logger module. Singleton pattern has been applied into this module +so that registered callbacks can be used all through the same python process. +""" +from __future__ import absolute_import + +import logging + +import airflow.settings + + +def register_pre_exec_callback(action_logger): + """ + Registers more action_logger function callback for pre-execution. + This function callback is expected to be called with keyword args. + For more about the arguments that is being passed to the callback, + refer to airflow.utils.cli.action_logging() + :param action_logger: An action logger function + :return: None + """ + logging.debug("Adding {} to pre execution callback".format(action_logger)) + __pre_exec_callbacks.append(action_logger) + + +def register_post_exec_callback(action_logger): + """ + Registers more action_logger function callback for post-execution. + This function callback is expected to be called with keyword args. + For more about the arguments that is being passed to the callback, + refer to airflow.utils.cli.action_logging() + :param action_logger: An action logger function + :return: None + """ + logging.debug("Adding {} to post execution callback".format(action_logger)) + __post_exec_callbacks.append(action_logger) + + +def on_pre_execution(**kwargs): + """ + Calls callbacks before execution. + Note that any exception from callback will be logged but won't be propagated. + :param kwargs: + :return: None + """ + logging.debug("Calling callbacks: {}".format(__pre_exec_callbacks)) + for cb in __pre_exec_callbacks: + try: + cb(**kwargs) + except Exception: + logging.exception('Failed on pre-execution callback using {}'.format(cb)) + + +def on_post_execution(**kwargs): + """ + Calls callbacks after execution. + As it's being called after execution, it can capture status of execution, + duration, etc. Note that any exception from callback will be logged but + won't be propagated. + :param kwargs: + :return: None + """ + logging.debug("Calling callbacks: {}".format(__post_exec_callbacks)) + for cb in __post_exec_callbacks: + try: + cb(**kwargs) + except Exception: + logging.exception('Failed on post-execution callback using {}'.format(cb)) + + +def default_action_log(log, **_): + """ + A default action logger callback that behave same as www.utils.action_logging + which uses global session and pushes log ORM object. + :param log: An log ORM instance + :param **_: other keyword arguments that is not being used by this function + :return: None + """ + session = airflow.settings.Session() + session.add(log) + session.commit() + + +__pre_exec_callbacks = [] +__post_exec_callbacks = [] + +# By default, register default action log into pre-execution callback +register_pre_exec_callback(default_action_log) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/tests/utils/test_cli_util.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py new file mode 100644 index 0000000..c4a76cc --- /dev/null +++ b/tests/utils/test_cli_util.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import unittest +from argparse import Namespace +from contextlib import contextmanager +from datetime import datetime + +from airflow.utils import cli, cli_action_loggers + + +class CliUtilTest(unittest.TestCase): + + def test_metrics_build(self): + func_name = 'test' + exec_date = datetime.utcnow() + ns = Namespace(dag_id='foo', task_id='bar', + subcommand='test', execution_date=exec_date) + metrics = cli._build_metrics(func_name, ns) + + expected = {'user': os.environ.get('USER'), + 'sub_command': 'test', + 'dag_id': 'foo', + 'task_id': 'bar', + 'execution_date': exec_date} + for k, v in expected.items(): + self.assertEquals(v, metrics.get(k)) + + self.assertTrue(metrics.get('start_datetime') <= datetime.utcnow()) + self.assertTrue(metrics.get('full_command')) + + log_dao = metrics.get('log') + self.assertTrue(log_dao) + self.assertEquals(log_dao.dag_id, metrics.get('dag_id')) + self.assertEquals(log_dao.task_id, metrics.get('task_id')) + self.assertEquals(log_dao.execution_date, metrics.get('execution_date')) + self.assertEquals(log_dao.owner, metrics.get('user')) + + def test_fail_function(self): + """ + Actual function is failing and fail needs to be propagated. + :return: + """ + with self.assertRaises(NotImplementedError): + fail_func(Namespace()) + + def test_success_function(self): + """ + Test success function but with failing callback. + In this case, failure should not propagate. + :return: + """ + with fail_action_logger_callback(): + success_func(Namespace()) + + +@contextmanager +def fail_action_logger_callback(): + """ + Adding failing callback and revert it back when closed. + :return: + """ + tmp = cli_action_loggers.__pre_exec_callbacks[:] + + def fail_callback(**_): + raise NotImplementedError + + cli_action_loggers.register_pre_exec_callback(fail_callback) + yield + cli_action_loggers.__pre_exec_callbacks = tmp + + +@cli.action_logging +def fail_func(_): + raise NotImplementedError + + +@cli.action_logging +def success_func(_): + pass + + +if __name__ == '__main__': + unittest.main()