Repository: incubator-airflow
Updated Branches:
  refs/heads/master a6487d619 -> 9dba430b6


[AIRFLOW-2253] Add Airflow CLI instrumentation

Closes #3159 from jinhyukchang/cli-instrumentation


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9dba430b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9dba430b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9dba430b

Branch: refs/heads/master
Commit: 9dba430b683361fc0ed7f50de6daa03c971a476b
Parents: a6487d6
Author: Jin Hyuk Chang <jinch...@lyft.com>
Authored: Mon Apr 2 15:16:25 2018 -0700
Committer: Maxime Beauchemin <maximebeauche...@gmail.com>
Committed: Mon Apr 2 15:16:25 2018 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py                  |  30 ++++++++
 airflow/settings.py                 |  11 ++-
 airflow/utils/cli.py                | 114 +++++++++++++++++++++++++++++++
 airflow/utils/cli_action_loggers.py | 101 +++++++++++++++++++++++++++
 tests/utils/test_cli_util.py        |  97 ++++++++++++++++++++++++++
 5 files changed, 352 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 24f5d31..eb96e77 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -54,6 +54,7 @@ from airflow.models import (DagModel, DagBag, TaskInstance,
                             Connection, DAG)
 
 from airflow.ti_deps.dep_context import (DepContext, SCHEDULER_DEPS)
+from airflow.utils import cli as cli_utils
 from airflow.utils import db as db_utils
 from airflow.utils.net import get_hostname
 from airflow.utils.log.logging_mixin import (LoggingMixin, redirect_stderr,
@@ -149,6 +150,7 @@ def get_dags(args):
     return matched_dags
 
 
+@cli_utils.action_logging
 def backfill(args, dag=None):
     logging.basicConfig(
         level=settings.LOGGING_LEVEL,
@@ -190,6 +192,7 @@ def backfill(args, dag=None):
             delay_on_limit_secs=args.delay_on_limit)
 
 
+@cli_utils.action_logging
 def trigger_dag(args):
     """
     Creates a dag run for the specified dag
@@ -208,6 +211,7 @@ def trigger_dag(args):
     log.info(message)
 
 
+@cli_utils.action_logging
 def delete_dag(args):
     """
     Deletes all DB records related to the specified dag
@@ -228,6 +232,7 @@ def delete_dag(args):
         print("Bail.")
 
 
+@cli_utils.action_logging
 def pool(args):
     log = LoggingMixin().log
 
@@ -252,6 +257,7 @@ def pool(args):
         log.info(_tabulate(pools=pools))
 
 
+@cli_utils.action_logging
 def variables(args):
     if args.get:
         try:
@@ -328,10 +334,12 @@ def export_helper(filepath):
     print("{} variables successfully exported to {}".format(len(var_dict), 
filepath))
 
 
+@cli_utils.action_logging
 def pause(args, dag=None):
     set_is_paused(True, args, dag)
 
 
+@cli_utils.action_logging
 def unpause(args, dag=None):
     set_is_paused(False, args, dag)
 
@@ -401,6 +409,7 @@ def _run(args, dag, ti):
         executor.end()
 
 
+@cli_utils.action_logging
 def run(args, dag=None):
     # Disable connection pooling to reduce the # of connections on the DB
     # while it's waiting for the task to finish.
@@ -453,6 +462,8 @@ def run(args, dag=None):
             _run(args, dag, ti)
         logging.shutdown()
 
+
+@cli_utils.action_logging
 def task_failed_deps(args):
     """
     Returns the unmet dependencies for a task instance from the perspective of 
the
@@ -479,6 +490,7 @@ def task_failed_deps(args):
         print("Task instance dependencies are all met.")
 
 
+@cli_utils.action_logging
 def task_state(args):
     """
     Returns the state of a TaskInstance at the command line.
@@ -492,6 +504,7 @@ def task_state(args):
     print(ti.current_state())
 
 
+@cli_utils.action_logging
 def dag_state(args):
     """
     Returns the state of a DagRun at the command line.
@@ -504,6 +517,7 @@ def dag_state(args):
     print(dr[0].state if len(dr) > 0 else None)
 
 
+@cli_utils.action_logging
 def list_dags(args):
     dagbag = DagBag(process_subdir(args.subdir))
     s = textwrap.dedent("""\n
@@ -518,6 +532,7 @@ def list_dags(args):
         print(dagbag.dagbag_report())
 
 
+@cli_utils.action_logging
 def list_tasks(args, dag=None):
     dag = dag or get_dag(args)
     if args.tree:
@@ -527,6 +542,7 @@ def list_tasks(args, dag=None):
         print("\n".join(sorted(tasks)))
 
 
+@cli_utils.action_logging
 def test(args, dag=None):
     dag = dag or get_dag(args)
 
@@ -543,6 +559,7 @@ def test(args, dag=None):
         ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
 
 
+@cli_utils.action_logging
 def render(args):
     dag = get_dag(args)
     task = dag.get_task(task_id=args.task_id)
@@ -557,6 +574,7 @@ def render(args):
         """.format(attr, getattr(task, attr))))
 
 
+@cli_utils.action_logging
 def clear(args):
     logging.basicConfig(
         level=settings.LOGGING_LEVEL,
@@ -713,6 +731,7 @@ def restart_workers(gunicorn_master_proc, 
num_workers_expected, master_timeout):
             sys.exit(1)
 
 
+@cli_utils.action_logging
 def webserver(args):
     print(settings.HEADER)
 
@@ -838,6 +857,7 @@ def webserver(args):
             monitor_gunicorn(gunicorn_master_proc)
 
 
+@cli_utils.action_logging
 def scheduler(args):
     print(settings.HEADER)
     job = jobs.SchedulerJob(
@@ -871,6 +891,7 @@ def scheduler(args):
         job.run()
 
 
+@cli_utils.action_logging
 def serve_logs(args):
     print("Starting flask")
     import flask
@@ -891,6 +912,7 @@ def serve_logs(args):
         host='0.0.0.0', port=WORKER_LOG_SERVER_PORT)
 
 
+@cli_utils.action_logging
 def worker(args):
     env = os.environ.copy()
     env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME
@@ -937,12 +959,14 @@ def worker(args):
         sp.kill()
 
 
+@cli_utils.action_logging
 def initdb(args):  # noqa
     print("DB: " + repr(settings.engine.url))
     db_utils.initdb(settings.RBAC)
     print("Done.")
 
 
+@cli_utils.action_logging
 def resetdb(args):
     print("DB: " + repr(settings.engine.url))
     if args.yes or input(
@@ -953,6 +977,7 @@ def resetdb(args):
         print("Bail.")
 
 
+@cli_utils.action_logging
 def upgradedb(args):  # noqa
     print("DB: " + repr(settings.engine.url))
     db_utils.upgradedb()
@@ -970,6 +995,7 @@ def upgradedb(args):  # noqa
         session.commit()
 
 
+@cli_utils.action_logging
 def version(args):  # noqa
     print(settings.HEADER + "  v" + airflow.__version__)
 
@@ -978,6 +1004,7 @@ alternative_conn_specs = ['conn_type', 'conn_host',
                           'conn_login', 'conn_password', 'conn_schema', 
'conn_port']
 
 
+@cli_utils.action_logging
 def connections(args):
     if args.list:
         # Check that no other flags were passed to the command
@@ -1098,6 +1125,7 @@ def connections(args):
         return
 
 
+@cli_utils.action_logging
 def flower(args):
     broka = conf.get('celery', 'BROKER_URL')
     address = '--address={}'.format(args.hostname)
@@ -1139,6 +1167,7 @@ def flower(args):
                              broka, address, port, api, flower_conf, 
url_prefix])
 
 
+@cli_utils.action_logging
 def kerberos(args):  # noqa
     print(settings.HEADER)
     import airflow.security.kerberos
@@ -1167,6 +1196,7 @@ def kerberos(args):  # noqa
         airflow.security.kerberos.run()
 
 
+@cli_utils.action_logging
 def create_user(args):
     fields = {
         'role': args.role,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 06748ce..1abafa4 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -188,9 +188,17 @@ def configure_adapters():
         pass
 
 
+def configure_action_logging():
+    """
+    Any additional configuration (register callback) for 
airflow.utils.action_loggers
+    module
+    :return: None
+    """
+    pass
+
+
 try:
     from airflow_local_settings import *
-
     log.info("Loaded airflow_local_settings.")
 except:
     pass
@@ -199,6 +207,7 @@ configure_logging()
 configure_vars()
 configure_adapters()
 configure_orm()
+configure_action_logging()
 
 # Ensure we close DB connections at scheduler and gunicon worker terminations
 atexit.register(dispose_orm)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/airflow/utils/cli.py
----------------------------------------------------------------------
diff --git a/airflow/utils/cli.py b/airflow/utils/cli.py
new file mode 100644
index 0000000..5f71bca
--- /dev/null
+++ b/airflow/utils/cli.py
@@ -0,0 +1,114 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Utilities module for cli
+"""
+from __future__ import absolute_import
+
+import functools
+import getpass
+import json
+import socket
+import sys
+from argparse import Namespace
+from datetime import datetime
+
+import airflow.models
+from airflow.utils import cli_action_loggers
+
+
+def action_logging(f):
+    """
+    Decorates function to execute function at the same time submitting 
action_logging
+    but in CLI context. It will call action logger callbacks twice,
+    one for pre-execution and the other one for post-execution.
+
+    Action logger will be called with below keyword parameters:
+        sub_command : name of sub-command
+        start_datetime : start datetime instance by utc
+        end_datetime : end datetime instance by utc
+        full_command : full command line arguments
+        user : current user
+        log : airflow.models.Log ORM instance
+        dag_id : dag id (optional)
+        task_id : task_id (optional)
+        execution_date : execution date (optional)
+        error : exception instance if there's an exception
+
+    :param f: function instance
+    :return: wrapped function
+    """
+    @functools.wraps(f)
+    def wrapper(*args, **kwargs):
+        """
+        An wrapper for cli functions. It assumes to have Namespace instance
+        at 1st positional argument
+        :param args: Positional argument. It assumes to have Namespace instance
+        at 1st positional argument
+        :param kwargs: A passthrough keyword argument
+        """
+        assert args
+        assert isinstance(args[0], Namespace), \
+            "1st positional argument should be argparse.Namespace instance, " \
+            "but {}".format(args[0])
+        metrics = _build_metrics(f.__name__, args[0])
+        cli_action_loggers.on_pre_execution(**metrics)
+        try:
+            return f(*args, **kwargs)
+        except Exception as e:
+            metrics['error'] = e
+            raise e
+        finally:
+            metrics['end_datetime'] = datetime.utcnow()
+            cli_action_loggers.on_post_execution(**metrics)
+
+    return wrapper
+
+
+def _build_metrics(func_name, namespace):
+    """
+    Builds metrics dict from function args
+    It assumes that function arguments is from airflow.bin.cli module's 
function
+    and has Namespace instance where it optionally contains "dag_id", 
"task_id",
+    and "execution_date".
+
+    :param func_name: name of function
+    :param namespace: Namespace instance from argparse
+    :return: dict with metrics
+    """
+
+    metrics = {'sub_command': func_name}
+    metrics['start_datetime'] = datetime.utcnow()
+    metrics['full_command'] = '{}'.format(list(sys.argv))
+    metrics['user'] = getpass.getuser()
+
+    assert isinstance(namespace, Namespace)
+    tmp_dic = vars(namespace)
+    metrics['dag_id'] = tmp_dic.get('dag_id')
+    metrics['task_id'] = tmp_dic.get('task_id')
+    metrics['execution_date'] = tmp_dic.get('execution_date')
+    metrics['host_name'] = socket.gethostname()
+
+    extra = json.dumps(dict((k, metrics[k]) for k in ('host_name', 
'full_command')))
+    log = airflow.models.Log(
+        event='cli_{}'.format(func_name),
+        task_instance=None,
+        owner=metrics['user'],
+        extra=extra,
+        task_id=metrics.get('task_id'),
+        dag_id=metrics.get('dag_id'),
+        execution_date=metrics.get('execution_date'))
+    metrics['log'] = log
+    return metrics

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/airflow/utils/cli_action_loggers.py
----------------------------------------------------------------------
diff --git a/airflow/utils/cli_action_loggers.py 
b/airflow/utils/cli_action_loggers.py
new file mode 100644
index 0000000..1bc6865
--- /dev/null
+++ b/airflow/utils/cli_action_loggers.py
@@ -0,0 +1,101 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+An Action Logger module. Singleton pattern has been applied into this module
+so that registered callbacks can be used all through the same python process.
+"""
+from __future__ import absolute_import
+
+import logging
+
+import airflow.settings
+
+
+def register_pre_exec_callback(action_logger):
+    """
+    Registers more action_logger function callback for pre-execution.
+    This function callback is expected to be called with keyword args.
+    For more about the arguments that is being passed to the callback,
+    refer to airflow.utils.cli.action_logging()
+    :param action_logger: An action logger function
+    :return: None
+    """
+    logging.debug("Adding {} to pre execution callback".format(action_logger))
+    __pre_exec_callbacks.append(action_logger)
+
+
+def register_post_exec_callback(action_logger):
+    """
+    Registers more action_logger function callback for post-execution.
+    This function callback is expected to be called with keyword args.
+    For more about the arguments that is being passed to the callback,
+    refer to airflow.utils.cli.action_logging()
+    :param action_logger: An action logger function
+    :return: None
+    """
+    logging.debug("Adding {} to post execution callback".format(action_logger))
+    __post_exec_callbacks.append(action_logger)
+
+
+def on_pre_execution(**kwargs):
+    """
+    Calls callbacks before execution.
+    Note that any exception from callback will be logged but won't be 
propagated.
+    :param kwargs:
+    :return: None
+    """
+    logging.debug("Calling callbacks: {}".format(__pre_exec_callbacks))
+    for cb in __pre_exec_callbacks:
+        try:
+            cb(**kwargs)
+        except Exception:
+            logging.exception('Failed on pre-execution callback using 
{}'.format(cb))
+
+
+def on_post_execution(**kwargs):
+    """
+    Calls callbacks after execution.
+    As it's being called after execution, it can capture status of execution,
+    duration, etc. Note that any exception from callback will be logged but
+    won't be propagated.
+    :param kwargs:
+    :return: None
+    """
+    logging.debug("Calling callbacks: {}".format(__post_exec_callbacks))
+    for cb in __post_exec_callbacks:
+        try:
+            cb(**kwargs)
+        except Exception:
+            logging.exception('Failed on post-execution callback using 
{}'.format(cb))
+
+
+def default_action_log(log, **_):
+    """
+    A default action logger callback that behave same as 
www.utils.action_logging
+    which uses global session and pushes log ORM object.
+    :param log: An log ORM instance
+    :param **_: other keyword arguments that is not being used by this function
+    :return: None
+    """
+    session = airflow.settings.Session()
+    session.add(log)
+    session.commit()
+
+
+__pre_exec_callbacks = []
+__post_exec_callbacks = []
+
+# By default, register default action log into pre-execution callback
+register_pre_exec_callback(default_action_log)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9dba430b/tests/utils/test_cli_util.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_cli_util.py b/tests/utils/test_cli_util.py
new file mode 100644
index 0000000..c4a76cc
--- /dev/null
+++ b/tests/utils/test_cli_util.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import os
+import unittest
+from argparse import Namespace
+from contextlib import contextmanager
+from datetime import datetime
+
+from airflow.utils import cli, cli_action_loggers
+
+
+class CliUtilTest(unittest.TestCase):
+
+    def test_metrics_build(self):
+        func_name = 'test'
+        exec_date = datetime.utcnow()
+        ns = Namespace(dag_id='foo', task_id='bar',
+                       subcommand='test', execution_date=exec_date)
+        metrics = cli._build_metrics(func_name, ns)
+
+        expected = {'user': os.environ.get('USER'),
+                    'sub_command': 'test',
+                    'dag_id': 'foo',
+                    'task_id': 'bar',
+                    'execution_date': exec_date}
+        for k, v in expected.items():
+            self.assertEquals(v, metrics.get(k))
+
+        self.assertTrue(metrics.get('start_datetime') <= datetime.utcnow())
+        self.assertTrue(metrics.get('full_command'))
+
+        log_dao = metrics.get('log')
+        self.assertTrue(log_dao)
+        self.assertEquals(log_dao.dag_id, metrics.get('dag_id'))
+        self.assertEquals(log_dao.task_id, metrics.get('task_id'))
+        self.assertEquals(log_dao.execution_date, 
metrics.get('execution_date'))
+        self.assertEquals(log_dao.owner, metrics.get('user'))
+
+    def test_fail_function(self):
+        """
+        Actual function is failing and fail needs to be propagated.
+        :return:
+        """
+        with self.assertRaises(NotImplementedError):
+            fail_func(Namespace())
+
+    def test_success_function(self):
+        """
+        Test success function but with failing callback.
+        In this case, failure should not propagate.
+        :return:
+        """
+        with fail_action_logger_callback():
+            success_func(Namespace())
+
+
+@contextmanager
+def fail_action_logger_callback():
+    """
+    Adding failing callback and revert it back when closed.
+    :return:
+    """
+    tmp = cli_action_loggers.__pre_exec_callbacks[:]
+
+    def fail_callback(**_):
+        raise NotImplementedError
+
+    cli_action_loggers.register_pre_exec_callback(fail_callback)
+    yield
+    cli_action_loggers.__pre_exec_callbacks = tmp
+
+
+@cli.action_logging
+def fail_func(_):
+    raise NotImplementedError
+
+
+@cli.action_logging
+def success_func(_):
+    pass
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to