This is an automated email from the ASF dual-hosted git repository.

beto pushed a commit to branch chart-telemetry
in repository https://gitbox.apache.org/repos/asf/superset.git

commit ce496a36463489a4a99f75dfd8196468e89a895b
Author: Beto Dealmeida <[email protected]>
AuthorDate: Wed Mar 13 13:26:03 2024 -0400

    feat: chart telemetry
---
 superset/charts/data/api.py                      | 22 +++++---
 superset/commands/chart/data/get_data_command.py |  4 +-
 superset/common/query_context_processor.py       |  4 ++
 superset/extensions/telemetry.py                 | 68 ++++++++++++++++++++++++
 superset/models/core.py                          | 50 +++++++++--------
 superset/utils/decorators.py                     | 35 +++++++++++-
 superset/views/base.py                           |  6 +++
 7 files changed, 157 insertions(+), 32 deletions(-)

diff --git a/superset/charts/data/api.py b/superset/charts/data/api.py
index 2e46eb2737..a7a021b97a 100644
--- a/superset/charts/data/api.py
+++ b/superset/charts/data/api.py
@@ -53,7 +53,7 @@ from superset.utils.core import (
     get_user_id,
     json_int_dttm_ser,
 )
-from superset.utils.decorators import logs_context
+from superset.utils.decorators import logs_context, show_telemetry
 from superset.views.base import CsvResponse, generate_download_headers, 
XlsxResponse
 from superset.views.base_api import statsd_metrics
 
@@ -181,6 +181,7 @@ class ChartDataRestApi(ChartRestApi):
 
     @expose("/data", methods=("POST",))
     @protect()
+    @show_telemetry
     @statsd_metrics
     @event_logger.log_this_with_context(
         action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.data",
@@ -225,6 +226,8 @@ class ChartDataRestApi(ChartRestApi):
             500:
               $ref: '#/components/responses/500'
         """
+        g.telemetry.add("Computing chart data")
+
         json_body = None
         if request.is_json:
             json_body = request.json
@@ -358,7 +361,8 @@ class ChartDataRestApi(ChartRestApi):
         # This is needed for sending reports based on text charts that do the
         # post-processing of data, eg, the pivot table.
         if result_type == ChartDataResultType.POST_PROCESSED:
-            result = apply_post_process(result, form_data, datasource)
+            with g.telemetry("Post processing data"):
+                result = apply_post_process(result, form_data, datasource)
 
         if result_format in ChartDataResultFormat.table_like():
             # Verify user has permission to export file
@@ -396,11 +400,12 @@ class ChartDataRestApi(ChartRestApi):
             )
 
         if result_format == ChartDataResultFormat.JSON:
-            response_data = simplejson.dumps(
-                {"result": result["queries"]},
-                default=json_int_dttm_ser,
-                ignore_nan=True,
-            )
+            with g.telemetry("JSON encoding"):
+                response_data = simplejson.dumps(
+                    {"result": result["queries"]},
+                    default=json_int_dttm_ser,
+                    ignore_nan=True,
+                )
             resp = make_response(response_data, 200)
             resp.headers["Content-Type"] = "application/json; charset=utf-8"
             return resp
@@ -415,7 +420,8 @@ class ChartDataRestApi(ChartRestApi):
         datasource: BaseDatasource | Query | None = None,
     ) -> Response:
         try:
-            result = command.run(force_cached=force_cached)
+            with g.telemetry("Running command"):
+                result = command.run(force_cached=force_cached)
         except ChartDataCacheLoadError as exc:
             return self.response_422(message=exc.message)
         except ChartDataQueryFailedError as exc:
diff --git a/superset/commands/chart/data/get_data_command.py 
b/superset/commands/chart/data/get_data_command.py
index 971c343cba..edf1df14a2 100644
--- a/superset/commands/chart/data/get_data_command.py
+++ b/superset/commands/chart/data/get_data_command.py
@@ -17,6 +17,7 @@
 import logging
 from typing import Any
 
+from flask import g
 from flask_babel import gettext as _
 
 from superset.commands.base import BaseCommand
@@ -43,7 +44,8 @@ class ChartDataCommand(BaseCommand):
         force_cached = kwargs.get("force_cached", False)
         try:
             payload = self._query_context.get_payload(
-                cache_query_context=cache_query_context, 
force_cached=force_cached
+                cache_query_context=cache_query_context,
+                force_cached=force_cached,
             )
         except CacheLoadError as ex:
             raise ChartDataCacheLoadError(ex.message) from ex
diff --git a/superset/common/query_context_processor.py 
b/superset/common/query_context_processor.py
index d8b5bea4bb..543de5c740 100644
--- a/superset/common/query_context_processor.py
+++ b/superset/common/query_context_processor.py
@@ -23,6 +23,7 @@ from typing import Any, ClassVar, TYPE_CHECKING, TypedDict
 
 import numpy as np
 import pandas as pd
+from flask import g
 from flask_babel import gettext as _
 from pandas import DateOffset
 
@@ -168,6 +169,9 @@ class QueryContextProcessor:
                 cache.error_message = str(ex)
                 cache.status = QueryStatus.FAILED
 
+        else:
+            g.telemetry.add("Hit cache")
+
         # the N-dimensional DataFrame has converted into flat DataFrame
         # by `flatten operator`, "comma" in the column is escaped by 
`escape_separator`
         # the result DataFrame columns should be unescaped
diff --git a/superset/extensions/telemetry.py b/superset/extensions/telemetry.py
new file mode 100644
index 0000000000..aa45ff5671
--- /dev/null
+++ b/superset/extensions/telemetry.py
@@ -0,0 +1,68 @@
+import time
+from collections.abc import Iterator
+from contextlib import contextmanager
+
+
+class TelemetryHandler:
+    """
+    Handler for telemetry events.
+
+    To use this, decorate an endpoint with `@show_telemetry`:
+
+        @expose("/")
+        @show_telemetry
+        def some_endpoint() -> str:
+            g.telemetry.add("Processing request")
+
+            with g.telemetry("Computation"):
+                output = {"answer": some_computation()}
+
+            return jsonify(output)
+
+    The response payload will then look like this:
+
+        {
+            "answer": 42,
+            "telemetry": {
+                1710345892.8975344: "Processing request",
+                1710345893.4794712: "Computation START",
+                1710345900.3592598: "Computation END",
+            },
+        }
+
+    """
+
+    def __init__(self) -> None:
+        self.events: list[tuple[str, float]] = []
+
+    @contextmanager
+    def __call__(self, event: str) -> Iterator[None]:
+        """
+        Context manager for start/end events.
+
+            with g.telemetry("Run query"):
+                run_query()
+
+        Will produce the events "Run query START" and "Run query END". In the 
context
+        manager block raises an exception, "Run query ERROR" will be produced 
instead of
+        the latter.
+        """
+        self.add(f"{event} START")
+        try:
+            yield
+            self.add(f"{event} END")
+        except Exception as ex:
+            self.add(f"{event} ERROR")
+            raise ex
+
+    def add(self, event: str) -> None:
+        """
+        Add a single event.
+        """
+        self.events.append((event, time.time()))
+
+    def to_dict(self) -> dict[float, str]:
+        """
+        Convert to a dictionary.
+        """
+        return {event_time: event_name for event_name, event_time in 
self.events}  #
diff --git a/superset/models/core.py b/superset/models/core.py
index 71a6e9d042..d1019351ec 100755
--- a/superset/models/core.py
+++ b/superset/models/core.py
@@ -579,35 +579,41 @@ class Database(
                 )
 
         with self.get_raw_connection(schema=schema) as conn:
-            cursor = conn.cursor()
-            for sql_ in sqls[:-1]:
+            with g.telemetry("Executing query"):
+                cursor = conn.cursor()
+                for sql_ in sqls[:-1]:
+                    if mutate_after_split:
+                        sql_ = sql_query_mutator(
+                            sql_,
+                            security_manager=security_manager,
+                            database=None,
+                        )
+                    _log_query(sql_)
+                    self.db_engine_spec.execute(cursor, sql_)
+                    cursor.fetchall()
+
                 if mutate_after_split:
-                    sql_ = sql_query_mutator(
-                        sql_,
+                    last_sql = sql_query_mutator(
+                        sqls[-1],
                         security_manager=security_manager,
                         database=None,
                     )
-                _log_query(sql_)
-                self.db_engine_spec.execute(cursor, sql_)
-                cursor.fetchall()
-
-            if mutate_after_split:
-                last_sql = sql_query_mutator(
-                    sqls[-1],
-                    security_manager=security_manager,
-                    database=None,
-                )
-                _log_query(last_sql)
-                self.db_engine_spec.execute(cursor, last_sql)
-            else:
-                _log_query(sqls[-1])
-                self.db_engine_spec.execute(cursor, sqls[-1])
+                    _log_query(last_sql)
+                    self.db_engine_spec.execute(cursor, last_sql)
+                else:
+                    _log_query(sqls[-1])
+                    self.db_engine_spec.execute(cursor, sqls[-1])
+
+            with g.telemetry("Fetching data from cursor"):
+                data = self.db_engine_spec.fetch_data(cursor)
 
-            data = self.db_engine_spec.fetch_data(cursor)
             result_set = SupersetResultSet(
-                data, cursor.description, self.db_engine_spec
+                data,
+                cursor.description,
+                self.db_engine_spec,
             )
-            df = result_set.to_pandas_df()
+            with g.telemetry("Loding into dataframe"):
+                df = result_set.to_pandas_df()
             if mutator:
                 df = mutator(df)
 
diff --git a/superset/utils/decorators.py b/superset/utils/decorators.py
index 7e34b98360..fdc764ba36 100644
--- a/superset/utils/decorators.py
+++ b/superset/utils/decorators.py
@@ -20,10 +20,11 @@ import logging
 import time
 from collections.abc import Iterator
 from contextlib import contextmanager
+from functools import wraps
 from typing import Any, Callable, TYPE_CHECKING
 from uuid import UUID
 
-from flask import current_app, g, Response
+from flask import current_app, g, jsonify, Response
 
 from superset.utils import core as utils
 from superset.utils.dates import now_as_float
@@ -210,3 +211,35 @@ def suppress_logging(
         yield
     finally:
         target_logger.setLevel(original_level)
+
+
+def show_telemetry(f: Callable[..., Any]) -> Callable[..., Any]:
+    """
+    For JSON responses, add telemetry information to the payload.
+
+    This allows us to instrument the stack, but adding timestamps at different 
levels,
+    eg:
+
+        g.telemetry.add("START_RUN_QUERY")
+        data = run_query(sql)
+        g.telemetry.add("END_RUN_QUERY")
+
+    And then we can display this information in the UI.
+    """
+
+    @wraps(f)
+    def wrapped(*args: Any, **kwargs: Any) -> Any:
+        result = f(*args, **kwargs)
+        if hasattr(result, "get_json"):
+            try:
+                json_data = result.get_json()
+            except Exception:  # pylint: disable=broad-exception-caught
+                return result
+
+            if isinstance(json_data, dict) and hasattr(g, "telemetry"):
+                json_data["telemetry"] = g.telemetry.to_dict()
+                return jsonify(json_data)
+
+        return result
+
+    return wrapped
diff --git a/superset/views/base.py b/superset/views/base.py
index c8b4862710..1402953492 100644
--- a/superset/views/base.py
+++ b/superset/views/base.py
@@ -74,6 +74,7 @@ from superset.exceptions import (
     SupersetSecurityException,
 )
 from superset.extensions import cache_manager
+from superset.extensions.telemetry import TelemetryHandler
 from superset.models.helpers import ImportExportMixin
 from superset.reports.models import ReportRecipientType
 from superset.superset_typing import FlaskResponse
@@ -725,3 +726,8 @@ def apply_http_headers(response: Response) -> Response:
         if k not in response.headers:
             response.headers[k] = v
     return response
+
+
+@superset_app.before_request
+def start_telemetry() -> None:
+    g.telemetry = TelemetryHandler()

Reply via email to