This is an automated email from the ASF dual-hosted git repository. beto pushed a commit to branch chart-telemetry in repository https://gitbox.apache.org/repos/asf/superset.git
commit ce496a36463489a4a99f75dfd8196468e89a895b Author: Beto Dealmeida <[email protected]> AuthorDate: Wed Mar 13 13:26:03 2024 -0400 feat: chart telemetry --- superset/charts/data/api.py | 22 +++++--- superset/commands/chart/data/get_data_command.py | 4 +- superset/common/query_context_processor.py | 4 ++ superset/extensions/telemetry.py | 68 ++++++++++++++++++++++++ superset/models/core.py | 50 +++++++++-------- superset/utils/decorators.py | 35 +++++++++++- superset/views/base.py | 6 +++ 7 files changed, 157 insertions(+), 32 deletions(-) diff --git a/superset/charts/data/api.py b/superset/charts/data/api.py index 2e46eb2737..a7a021b97a 100644 --- a/superset/charts/data/api.py +++ b/superset/charts/data/api.py @@ -53,7 +53,7 @@ from superset.utils.core import ( get_user_id, json_int_dttm_ser, ) -from superset.utils.decorators import logs_context +from superset.utils.decorators import logs_context, show_telemetry from superset.views.base import CsvResponse, generate_download_headers, XlsxResponse from superset.views.base_api import statsd_metrics @@ -181,6 +181,7 @@ class ChartDataRestApi(ChartRestApi): @expose("/data", methods=("POST",)) @protect() + @show_telemetry @statsd_metrics @event_logger.log_this_with_context( action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.data", @@ -225,6 +226,8 @@ class ChartDataRestApi(ChartRestApi): 500: $ref: '#/components/responses/500' """ + g.telemetry.add("Computing chart data") + json_body = None if request.is_json: json_body = request.json @@ -358,7 +361,8 @@ class ChartDataRestApi(ChartRestApi): # This is needed for sending reports based on text charts that do the # post-processing of data, eg, the pivot table. if result_type == ChartDataResultType.POST_PROCESSED: - result = apply_post_process(result, form_data, datasource) + with g.telemetry("Post processing data"): + result = apply_post_process(result, form_data, datasource) if result_format in ChartDataResultFormat.table_like(): # Verify user has permission to export file @@ -396,11 +400,12 @@ class ChartDataRestApi(ChartRestApi): ) if result_format == ChartDataResultFormat.JSON: - response_data = simplejson.dumps( - {"result": result["queries"]}, - default=json_int_dttm_ser, - ignore_nan=True, - ) + with g.telemetry("JSON encoding"): + response_data = simplejson.dumps( + {"result": result["queries"]}, + default=json_int_dttm_ser, + ignore_nan=True, + ) resp = make_response(response_data, 200) resp.headers["Content-Type"] = "application/json; charset=utf-8" return resp @@ -415,7 +420,8 @@ class ChartDataRestApi(ChartRestApi): datasource: BaseDatasource | Query | None = None, ) -> Response: try: - result = command.run(force_cached=force_cached) + with g.telemetry("Running command"): + result = command.run(force_cached=force_cached) except ChartDataCacheLoadError as exc: return self.response_422(message=exc.message) except ChartDataQueryFailedError as exc: diff --git a/superset/commands/chart/data/get_data_command.py b/superset/commands/chart/data/get_data_command.py index 971c343cba..edf1df14a2 100644 --- a/superset/commands/chart/data/get_data_command.py +++ b/superset/commands/chart/data/get_data_command.py @@ -17,6 +17,7 @@ import logging from typing import Any +from flask import g from flask_babel import gettext as _ from superset.commands.base import BaseCommand @@ -43,7 +44,8 @@ class ChartDataCommand(BaseCommand): force_cached = kwargs.get("force_cached", False) try: payload = self._query_context.get_payload( - cache_query_context=cache_query_context, force_cached=force_cached + cache_query_context=cache_query_context, + force_cached=force_cached, ) except CacheLoadError as ex: raise ChartDataCacheLoadError(ex.message) from ex diff --git a/superset/common/query_context_processor.py b/superset/common/query_context_processor.py index d8b5bea4bb..543de5c740 100644 --- a/superset/common/query_context_processor.py +++ b/superset/common/query_context_processor.py @@ -23,6 +23,7 @@ from typing import Any, ClassVar, TYPE_CHECKING, TypedDict import numpy as np import pandas as pd +from flask import g from flask_babel import gettext as _ from pandas import DateOffset @@ -168,6 +169,9 @@ class QueryContextProcessor: cache.error_message = str(ex) cache.status = QueryStatus.FAILED + else: + g.telemetry.add("Hit cache") + # the N-dimensional DataFrame has converted into flat DataFrame # by `flatten operator`, "comma" in the column is escaped by `escape_separator` # the result DataFrame columns should be unescaped diff --git a/superset/extensions/telemetry.py b/superset/extensions/telemetry.py new file mode 100644 index 0000000000..aa45ff5671 --- /dev/null +++ b/superset/extensions/telemetry.py @@ -0,0 +1,68 @@ +import time +from collections.abc import Iterator +from contextlib import contextmanager + + +class TelemetryHandler: + """ + Handler for telemetry events. + + To use this, decorate an endpoint with `@show_telemetry`: + + @expose("/") + @show_telemetry + def some_endpoint() -> str: + g.telemetry.add("Processing request") + + with g.telemetry("Computation"): + output = {"answer": some_computation()} + + return jsonify(output) + + The response payload will then look like this: + + { + "answer": 42, + "telemetry": { + 1710345892.8975344: "Processing request", + 1710345893.4794712: "Computation START", + 1710345900.3592598: "Computation END", + }, + } + + """ + + def __init__(self) -> None: + self.events: list[tuple[str, float]] = [] + + @contextmanager + def __call__(self, event: str) -> Iterator[None]: + """ + Context manager for start/end events. + + with g.telemetry("Run query"): + run_query() + + Will produce the events "Run query START" and "Run query END". In the context + manager block raises an exception, "Run query ERROR" will be produced instead of + the latter. + """ + self.add(f"{event} START") + try: + yield + self.add(f"{event} END") + except Exception as ex: + self.add(f"{event} ERROR") + raise ex + + def add(self, event: str) -> None: + """ + Add a single event. + """ + self.events.append((event, time.time())) + + def to_dict(self) -> dict[float, str]: + """ + Convert to a dictionary. + """ + return {event_time: event_name for event_name, event_time in self.events} # diff --git a/superset/models/core.py b/superset/models/core.py index 71a6e9d042..d1019351ec 100755 --- a/superset/models/core.py +++ b/superset/models/core.py @@ -579,35 +579,41 @@ class Database( ) with self.get_raw_connection(schema=schema) as conn: - cursor = conn.cursor() - for sql_ in sqls[:-1]: + with g.telemetry("Executing query"): + cursor = conn.cursor() + for sql_ in sqls[:-1]: + if mutate_after_split: + sql_ = sql_query_mutator( + sql_, + security_manager=security_manager, + database=None, + ) + _log_query(sql_) + self.db_engine_spec.execute(cursor, sql_) + cursor.fetchall() + if mutate_after_split: - sql_ = sql_query_mutator( - sql_, + last_sql = sql_query_mutator( + sqls[-1], security_manager=security_manager, database=None, ) - _log_query(sql_) - self.db_engine_spec.execute(cursor, sql_) - cursor.fetchall() - - if mutate_after_split: - last_sql = sql_query_mutator( - sqls[-1], - security_manager=security_manager, - database=None, - ) - _log_query(last_sql) - self.db_engine_spec.execute(cursor, last_sql) - else: - _log_query(sqls[-1]) - self.db_engine_spec.execute(cursor, sqls[-1]) + _log_query(last_sql) + self.db_engine_spec.execute(cursor, last_sql) + else: + _log_query(sqls[-1]) + self.db_engine_spec.execute(cursor, sqls[-1]) + + with g.telemetry("Fetching data from cursor"): + data = self.db_engine_spec.fetch_data(cursor) - data = self.db_engine_spec.fetch_data(cursor) result_set = SupersetResultSet( - data, cursor.description, self.db_engine_spec + data, + cursor.description, + self.db_engine_spec, ) - df = result_set.to_pandas_df() + with g.telemetry("Loding into dataframe"): + df = result_set.to_pandas_df() if mutator: df = mutator(df) diff --git a/superset/utils/decorators.py b/superset/utils/decorators.py index 7e34b98360..fdc764ba36 100644 --- a/superset/utils/decorators.py +++ b/superset/utils/decorators.py @@ -20,10 +20,11 @@ import logging import time from collections.abc import Iterator from contextlib import contextmanager +from functools import wraps from typing import Any, Callable, TYPE_CHECKING from uuid import UUID -from flask import current_app, g, Response +from flask import current_app, g, jsonify, Response from superset.utils import core as utils from superset.utils.dates import now_as_float @@ -210,3 +211,35 @@ def suppress_logging( yield finally: target_logger.setLevel(original_level) + + +def show_telemetry(f: Callable[..., Any]) -> Callable[..., Any]: + """ + For JSON responses, add telemetry information to the payload. + + This allows us to instrument the stack, but adding timestamps at different levels, + eg: + + g.telemetry.add("START_RUN_QUERY") + data = run_query(sql) + g.telemetry.add("END_RUN_QUERY") + + And then we can display this information in the UI. + """ + + @wraps(f) + def wrapped(*args: Any, **kwargs: Any) -> Any: + result = f(*args, **kwargs) + if hasattr(result, "get_json"): + try: + json_data = result.get_json() + except Exception: # pylint: disable=broad-exception-caught + return result + + if isinstance(json_data, dict) and hasattr(g, "telemetry"): + json_data["telemetry"] = g.telemetry.to_dict() + return jsonify(json_data) + + return result + + return wrapped diff --git a/superset/views/base.py b/superset/views/base.py index c8b4862710..1402953492 100644 --- a/superset/views/base.py +++ b/superset/views/base.py @@ -74,6 +74,7 @@ from superset.exceptions import ( SupersetSecurityException, ) from superset.extensions import cache_manager +from superset.extensions.telemetry import TelemetryHandler from superset.models.helpers import ImportExportMixin from superset.reports.models import ReportRecipientType from superset.superset_typing import FlaskResponse @@ -725,3 +726,8 @@ def apply_http_headers(response: Response) -> Response: if k not in response.headers: response.headers[k] = v return response + + +@superset_app.before_request +def start_telemetry() -> None: + g.telemetry = TelemetryHandler()
