This is an automated email from the ASF dual-hosted git repository. turbaszek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new e50e946 Task logging handlers can provide custom log links (#9354) e50e946 is described below commit e50e94613a671a86d72e687d9f27fe1cb73ebf36 Author: Mauricio De Diana <mdedi...@gmail.com> AuthorDate: Thu Jul 2 06:15:53 2020 -0300 Task logging handlers can provide custom log links (#9354) Use a mixin to define log handlers based on remote services. The main changes are: - Create RemoteLoggingMixin to define remote log handlers. - Remove explicit mentions to Elasticsearch in dag.html. - Rename the /elasticsearch endpoint in views.py to /redirect_to_remote_log and dispatch the remote URL building to the log handler. Co-authored-by: Kamil BreguĊa <mik-...@users.noreply.github.com> --- airflow/config_templates/airflow_local_settings.py | 2 + airflow/utils/log/es_task_handler.py | 34 +++++++++- airflow/utils/log/log_reader.py | 8 ++- airflow/utils/log/logging_mixin.py | 16 +++++ airflow/www/templates/airflow/dag.html | 30 +++++---- airflow/www/views.py | 55 ++++++++++++---- docs/howto/write-logs.rst | 26 ++++++++ tests/utils/log/test_es_task_handler.py | 22 +++++++ tests/www/test_views.py | 75 +++++++++++++++++++++- 9 files changed, 235 insertions(+), 33 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 2374bd90..5633ea9 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -234,6 +234,7 @@ if REMOTE_LOGGING: elif ELASTICSEARCH_HOST: ELASTICSEARCH_LOG_ID_TEMPLATE: str = conf.get('elasticsearch', 'LOG_ID_TEMPLATE') ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get('elasticsearch', 'END_OF_LOG_MARK') + ELASTICSEARCH_FRONTEND: str = conf.get('elasticsearch', 'frontend') ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 'WRITE_STDOUT') ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 'JSON_FORMAT') ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch', 'JSON_FIELDS') @@ -247,6 +248,7 @@ if REMOTE_LOGGING: 'filename_template': FILENAME_TEMPLATE, 'end_of_log_mark': ELASTICSEARCH_END_OF_LOG_MARK, 'host': ELASTICSEARCH_HOST, + 'frontend': ELASTICSEARCH_FRONTEND, 'write_stdout': ELASTICSEARCH_WRITE_STDOUT, 'json_format': ELASTICSEARCH_JSON_FORMAT, 'json_fields': ELASTICSEARCH_JSON_FIELDS diff --git a/airflow/utils/log/es_task_handler.py b/airflow/utils/log/es_task_handler.py index 47f970f..b6e1687 100644 --- a/airflow/utils/log/es_task_handler.py +++ b/airflow/utils/log/es_task_handler.py @@ -18,6 +18,7 @@ import logging import sys +from urllib.parse import quote # Using `from elasticsearch import *` would break elasticsearch mocking used in unit test. import elasticsearch @@ -25,14 +26,15 @@ import pendulum from elasticsearch_dsl import Search from airflow.configuration import conf +from airflow.models import TaskInstance from airflow.utils import timezone from airflow.utils.helpers import parse_template_string from airflow.utils.log.file_task_handler import FileTaskHandler from airflow.utils.log.json_formatter import JSONFormatter -from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.log.logging_mixin import ExternalLoggingMixin, LoggingMixin -class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): +class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin, ExternalLoggingMixin): """ ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. Note logs are not directly @@ -51,11 +53,13 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): PAGE = 0 MAX_LINE_PER_PAGE = 1000 + LOG_NAME = 'Elasticsearch' - def __init__(self, base_log_folder, filename_template, + def __init__(self, base_log_folder, filename_template, # pylint: disable=too-many-arguments log_id_template, end_of_log_mark, write_stdout, json_format, json_fields, host='localhost:9200', + frontend='localhost:5601', es_kwargs=conf.getsection("elasticsearch_configs")): """ :param base_log_folder: base folder to store logs locally @@ -72,6 +76,7 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): self.client = elasticsearch.Elasticsearch([host], **es_kwargs) + self.frontend = frontend self.mark_end_on_close = True self.end_of_log_mark = end_of_log_mark self.write_stdout = write_stdout @@ -262,3 +267,26 @@ class ElasticsearchTaskHandler(FileTaskHandler, LoggingMixin): super().close() self.closed = True + + @property + def log_name(self): + return self.LOG_NAME + + def get_external_log_url(self, task_instance: TaskInstance, try_number: int) -> str: + """ + Creates an address for an external log collecting service. + + :param task_instance: task instance object + :type: task_instance: TaskInstance + :param try_number: task instance try_number to read logs from. + :type try_number: Optional[int] + :return: URL to the external log collection service + :rtype: str + """ + log_id = self.log_id_template.format( + dag_id=task_instance.dag_id, + task_id=task_instance.task_id, + execution_date=task_instance.execution_date, + try_number=try_number) + url = 'https://' + self.frontend.format(log_id=quote(log_id)) + return url diff --git a/airflow/utils/log/log_reader.py b/airflow/utils/log/log_reader.py index ee36f31..6aab9e6 100644 --- a/airflow/utils/log/log_reader.py +++ b/airflow/utils/log/log_reader.py @@ -23,6 +23,7 @@ from cached_property import cached_property from airflow.configuration import conf from airflow.models import TaskInstance from airflow.utils.helpers import render_log_filename +from airflow.utils.log.logging_mixin import ExternalLoggingMixin class TaskLogReader: @@ -95,11 +96,16 @@ class TaskLogReader: return handler @property - def is_supported(self): + def supports_read(self): """Checks if a read operation is supported by a current log handler.""" return hasattr(self.log_handler, 'read') + @property + def supports_external_link(self): + """Check if the logging handler supports external links (e.g. to Elasticsearch, Stackdriver, etc).""" + return isinstance(self.log_handler, ExternalLoggingMixin) + def render_log_filename(self, ti: TaskInstance, try_number: Optional[int] = None): """ Renders the log attachment filename diff --git a/airflow/utils/log/logging_mixin.py b/airflow/utils/log/logging_mixin.py index ac6ca59..4ffb4f8 100644 --- a/airflow/utils/log/logging_mixin.py +++ b/airflow/utils/log/logging_mixin.py @@ -15,6 +15,7 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import abc import logging import re import sys @@ -58,6 +59,21 @@ class LoggingMixin: set_context(self.log, context) +class ExternalLoggingMixin: + """ + Define a log handler based on an external service (e.g. ELK, StackDriver). + """ + @abc.abstractproperty + def log_name(self) -> str: + """Return log name""" + + @abc.abstractmethod + def get_external_log_url(self, task_instance, try_number) -> str: + """ + Return the URL for log visualization in the external service. + """ + + # TODO: Formally inherit from io.IOBase class StreamLogWriter: """ diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 49dda5a..5ee1b20 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -167,12 +167,14 @@ <hr/> <hr/> </div> - <div id="dag_es_logs"> - <label style="display:inline"> View Logs in Elasticsearch (by attempts): </label> - <ul class="nav nav-pills" role="tablist" id="es_try_index" style="display:inline"> + <div id="dag_redir_logs"> + {% if external_log_name is defined %} + <label style="display:inline"> View Logs in {{ external_log_name }} (by attempts): </label> + <ul class="nav nav-pills" role="tablist" id="redir_log_try_index" style="display:inline"> </ul> <hr/> <hr/> + {% endif %} </div> <form method="POST"> <input name="csrf_token" type="hidden" value="{{ csrf_token() }}"/> @@ -366,9 +368,9 @@ function updateQueryStringParameter(uri, key, value) { var task_id = ''; var execution_date = ''; var subdag_id = ''; - var show_es_logs = false; - {% if show_external_logs is defined %} - show_es_logs = '{{ show_external_logs }}' == "True"; + var show_external_log_redirect = false; + {% if show_external_log_redirect is defined %} + show_external_log_redirect = '{{ show_external_log_redirect }}' == "True"; {% endif %} var buttons = Array.from(document.querySelectorAll('a[id^="btn_"][data-base-url]')).reduce(function(obj, elm) { @@ -441,18 +443,18 @@ function updateQueryStringParameter(uri, key, value) { } $("#dag_dl_logs").hide(); - $("#dag_es_logs").hide(); + $("#dag_redir_logs").hide(); if (try_numbers > 0) { $("#dag_dl_logs").show(); - if (show_es_logs) { - $("#dag_es_logs").show(); + if (show_external_log_redirect) { + $("#dag_redir_logs").show(); } } updateModalUrls(); $("#try_index > li").remove(); - $("#es_try_index > li").remove(); + $("#redir_log_try_index > li").remove(); var startIndex = (try_numbers > 2 ? 0 : 1) for (var index = startIndex; index < try_numbers; index++) { var url = "{{ url_for('Airflow.get_logs_with_metadata') }}" + @@ -474,14 +476,14 @@ function updateQueryStringParameter(uri, key, value) { </li>` ); - if (index == 0 || !show_es_logs) continue; - var es_url = "{{ url_for('Airflow.elasticsearch') }}" + + if (index == 0 || !show_external_log_redirect) continue; + var redir_log_url = "{{ url_for('Airflow.redirect_to_external_log') }}" + "?dag_id=" + encodeURIComponent(dag_id) + "&task_id=" + encodeURIComponent(task_id) + "&execution_date=" + encodeURIComponent(execution_date) + "&try_number=" + index; - $("#es_try_index").append(`<li role="presentation" style="display:inline"> - <a href="${es_url}"> ${showLabel} </a> + $("#redir_log_try_index").append(`<li role="presentation" style="display:inline"> + <a href="${redir_log_url}"> ${showLabel} </a> </li>` ); } diff --git a/airflow/www/views.py b/airflow/www/views.py index 591d2e2..3be4e25 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -28,7 +28,7 @@ from collections import defaultdict from datetime import datetime, timedelta from json import JSONDecodeError from typing import Dict, List, Optional, Tuple -from urllib.parse import quote, unquote +from urllib.parse import unquote import lazy_object_proxy import nvd3 @@ -678,7 +678,7 @@ class Airflow(AirflowBaseView): # noqa: D101 return response task_log_reader = TaskLogReader() - if not task_log_reader.is_supported: + if not task_log_reader.supports_read: return jsonify( message="Task log handler does not support read logs.", error=True, @@ -760,21 +760,34 @@ class Airflow(AirflowBaseView): # noqa: D101 execution_date=execution_date, form=form, root=root, wrapped=conf.getboolean('webserver', 'default_wrap')) - @expose('/elasticsearch') + @expose('/redirect_to_external_log') @has_dag_access(can_dag_read=True) @has_access @action_logging - def elasticsearch(self): + @provide_session + def redirect_to_external_log(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') + dttm = timezone.parse(execution_date) try_number = request.args.get('try_number', 1) - elasticsearch_frontend = conf.get('elasticsearch', 'frontend') - log_id_template = conf.get('elasticsearch', 'log_id_template') - log_id = log_id_template.format( - dag_id=dag_id, task_id=task_id, - execution_date=execution_date, try_number=try_number) - url = 'https://' + elasticsearch_frontend.format(log_id=quote(log_id)) + + ti = session.query(models.TaskInstance).filter( + models.TaskInstance.dag_id == dag_id, + models.TaskInstance.task_id == task_id, + models.TaskInstance.execution_date == dttm).first() + + if not ti: + flash(f"Task [{dag_id}.{task_id}] does not exist", "error") + return redirect(url_for('Airflow.index')) + + task_log_reader = TaskLogReader() + if not task_log_reader.supports_external_link: + flash("Task log handler does not support external links", "error") + return redirect(url_for('Airflow.index')) + + handler = task_log_reader.log_handler + url = handler.get_external_log_url(ti, try_number) return redirect(url) @expose('/task') @@ -1491,8 +1504,15 @@ class Airflow(AirflowBaseView): # noqa: D101 form = DateTimeWithNumRunsForm(data={'base_date': max_date, 'num_runs': num_runs}) - external_logs = conf.get('elasticsearch', 'frontend') + doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None), css_class='dag-doc') + + task_log_reader = TaskLogReader() + if task_log_reader.supports_external_link: + external_log_name = task_log_reader.log_handler.log_name + else: + external_log_name = None + # avoid spaces to reduce payload size data = htmlsafe_json_dumps(data, separators=(',', ':')) @@ -1505,7 +1525,8 @@ class Airflow(AirflowBaseView): # noqa: D101 doc_md=doc_md, data=data, blur=blur, num_runs=num_runs, - show_external_logs=bool(external_logs)) + show_external_log_redirect=task_log_reader.supports_external_link, + external_log_name=external_log_name) @expose('/graph') @has_dag_access(can_dag_read=True) @@ -1587,7 +1608,12 @@ class Airflow(AirflowBaseView): # noqa: D101 session.commit() doc_md = wwwutils.wrapped_markdown(getattr(dag, 'doc_md', None), css_class='dag-doc') - external_logs = conf.get('elasticsearch', 'frontend') + task_log_reader = TaskLogReader() + if task_log_reader.supports_external_link: + external_log_name = task_log_reader.log_handler.log_name + else: + external_log_name = None + return self.render_template( 'airflow/graph.html', dag=dag, @@ -1605,7 +1631,8 @@ class Airflow(AirflowBaseView): # noqa: D101 tasks=tasks, nodes=nodes, edges=edges, - show_external_logs=bool(external_logs)) + show_external_log_redirect=task_log_reader.supports_external_link, + external_log_name=external_log_name) @expose('/duration') @has_dag_access(can_dag_read=True) diff --git a/docs/howto/write-logs.rst b/docs/howto/write-logs.rst index c16a59b..9e0be0e 100644 --- a/docs/howto/write-logs.rst +++ b/docs/howto/write-logs.rst @@ -234,6 +234,7 @@ First, to use the handler, ``airflow.cfg`` must be configured as follows: remote_logging = True [elasticsearch] + host = <host>:<port> log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} end_of_log_mark = end_of_log write_stdout = @@ -251,6 +252,7 @@ To output task logs to stdout in JSON format, the following config could be used remote_logging = True [elasticsearch] + host = <host>:<port> log_id_template = {{dag_id}}-{{task_id}}-{{execution_date}}-{{try_number}} end_of_log_mark = end_of_log write_stdout = True @@ -318,3 +320,27 @@ be used. By using the ``logging_config_class`` option you can get :ref:`advanced features <write-logs-advanced>` of this handler. Details are available in the handler's documentation - :class:`~airflow.utils.log.stackdriver_task_handler.StackdriverTaskHandler`. + +External Links +============== + +When using remote logging, users can configure Airflow to show a link to an external UI within the Airflow Web UI. Clicking the link redirects a user to the external UI. + +Some external systems require specific configuration in Airflow for redirection to work but others do not. + +.. _log-link-elasticsearch: + +Elasticsearch External Link +------------------------------------ + +A user can configure Airflow to show a link to an Elasticsearch log viewing system (e.g. Kibana). + +To enable it, ``airflow.cfg`` must be configured as in the example below. Note the required ``{log_id}`` in the URL, when constructing the external link, Airflow replaces this parameter with the same ``log_id_template`` used for writing logs (see `Writing Logs to Elasticsearch`_). + +.. code-block:: ini + + [elasticsearch] + # Qualified URL for an elasticsearch frontend (like Kibana) with a template argument for log_id + # Code will construct log_id using the log_id template from the argument above. + # NOTE: The code will prefix the https:// automatically, don't include that here. + frontend = <host_port>/{log_id} diff --git a/tests/utils/log/test_es_task_handler.py b/tests/utils/log/test_es_task_handler.py index b4e8fac..8453ca8 100644 --- a/tests/utils/log/test_es_task_handler.py +++ b/tests/utils/log/test_es_task_handler.py @@ -21,9 +21,11 @@ import os import shutil import unittest from unittest import mock +from urllib.parse import quote import elasticsearch import pendulum +from parameterized import parameterized from airflow.configuration import conf from airflow.models import DAG, TaskInstance @@ -346,3 +348,23 @@ class TestElasticsearchTaskHandler(unittest.TestCase): def test_clean_execution_date(self): clean_execution_date = self.es_task_handler._clean_execution_date(datetime(2016, 7, 8, 9, 10, 11, 12)) self.assertEqual('2016_07_08T09_10_11_000012', clean_execution_date) + + @parameterized.expand([ + # Common case + ('localhost:5601/{log_id}', 'https://localhost:5601/' + quote(LOG_ID.replace('T', ' '))), + # Ignore template if "{log_id}"" is missing in the URL + ('localhost:5601', 'https://localhost:5601'), + ]) + def test_get_external_log_url(self, es_frontend, expected_url): + es_task_handler = ElasticsearchTaskHandler( + self.local_log_location, + self.filename_template, + self.log_id_template, + self.end_of_log_mark, + self.write_stdout, + self.json_format, + self.json_fields, + frontend=es_frontend + ) + url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number) + self.assertEqual(expected_url, url) diff --git a/tests/www/test_views.py b/tests/www/test_views.py index 090f3af..155c16c 100644 --- a/tests/www/test_views.py +++ b/tests/www/test_views.py @@ -56,6 +56,7 @@ from airflow.operators.dummy_operator import DummyOperator from airflow.settings import Session from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, RUNNABLE_STATES from airflow.utils import dates, timezone +from airflow.utils.log.logging_mixin import ExternalLoggingMixin from airflow.utils.session import create_session from airflow.utils.sqlalchemy import using_mysql from airflow.utils.state import State @@ -992,6 +993,36 @@ class TestAirflowBaseViews(TestBase): self.session.query(DM).filter(DM.dag_id == test_dag_id).update({'dag_id': dag_id}) self.session.commit() + @parameterized.expand(["graph", "tree"]) + def test_show_external_log_redirect_link_with_local_log_handler(self, endpoint): + """Do not show external links if log handler is local.""" + url = f'{endpoint}?dag_id=example_bash_operator' + with self.capture_templates() as templates: + self.client.get(url, follow_redirects=True) + ctx = templates[0].local_context + self.assertFalse(ctx['show_external_log_redirect']) + self.assertIsNone(ctx['external_log_name']) + + @parameterized.expand(["graph", "tree"]) + @mock.patch('airflow.utils.log.log_reader.TaskLogReader.log_handler', new_callable=PropertyMock) + def test_show_external_log_redirect_link_with_external_log_handler(self, endpoint, mock_log_handler): + """Show external links if log handler is external.""" + class ExternalHandler(ExternalLoggingMixin): + LOG_NAME = 'ExternalLog' + + @property + def log_name(self): + return self.LOG_NAME + + mock_log_handler.return_value = ExternalHandler() + + url = f'{endpoint}?dag_id=example_bash_operator' + with self.capture_templates() as templates: + self.client.get(url, follow_redirects=True) + ctx = templates[0].local_context + self.assertTrue(ctx['show_external_log_redirect']) + self.assertEqual(ctx['external_log_name'], ExternalHandler.LOG_NAME) + class TestConfigurationView(TestBase): def test_configuration_do_not_expose_config(self): @@ -1264,7 +1295,7 @@ class TestLogView(TestBase): @mock.patch("airflow.www.views.TaskLogReader") def test_get_logs_for_handler_without_read_method(self, mock_log_reader): - type(mock_log_reader.return_value).is_supported = PropertyMock(return_value=False) + type(mock_log_reader.return_value).supports_read = PropertyMock(return_value=False) url_template = "get_logs_with_metadata?dag_id={}&" \ "task_id={}&execution_date={}&" \ @@ -1283,6 +1314,48 @@ class TestLogView(TestBase): 'Task log handler does not support read logs.', response.json['message']) + @parameterized.expand([ + ('inexistent', ), + (TASK_ID, ), + ]) + def test_redirect_to_external_log_with_local_log_handler(self, task_id): + """Redirect to home if TI does not exist or if log handler is local""" + url_template = "redirect_to_external_log?dag_id={}&" \ + "task_id={}&execution_date={}&" \ + "try_number={}" + try_number = 1 + url = url_template.format(self.DAG_ID, + task_id, + quote_plus(self.DEFAULT_DATE.isoformat()), + try_number) + response = self.client.get(url) + + self.assertEqual(302, response.status_code) + self.assertEqual('http://localhost/home', response.headers['Location']) + + @mock.patch('airflow.utils.log.log_reader.TaskLogReader.log_handler', new_callable=PropertyMock) + def test_redirect_to_external_log_with_external_log_handler(self, mock_log_handler): + class ExternalHandler(ExternalLoggingMixin): + EXTERNAL_URL = 'http://external-service.com' + + def get_external_log_url(self, *args, **kwargs): + return self.EXTERNAL_URL + + mock_log_handler.return_value = ExternalHandler() + + url_template = "redirect_to_external_log?dag_id={}&" \ + "task_id={}&execution_date={}&" \ + "try_number={}" + try_number = 1 + url = url_template.format(self.DAG_ID, + self.TASK_ID, + quote_plus(self.DEFAULT_DATE.isoformat()), + try_number) + response = self.client.get(url) + + self.assertEqual(302, response.status_code) + self.assertEqual(ExternalHandler.EXTERNAL_URL, response.headers['Location']) + class TestVersionView(TestBase): def test_version(self):