codeant-ai-for-open-source[bot] commented on code in PR #36529:
URL: https://github.com/apache/superset/pull/36529#discussion_r2619626745


##########
superset/models/core.py:
##########
@@ -1276,6 +1276,38 @@ def purge_oauth2_tokens(self) -> None:
             DatabaseUserOAuth2Tokens.id == self.id
         ).delete()
 
+    def execute(
+        self,
+        sql: str,
+        options: Any | None = None,
+    ) -> Any:
+        """
+        Execute SQL synchronously.
+
+        :param sql: SQL query to execute
+        :param options: QueryOptions with execution settings
+        :returns: QueryResult with status, data, and metadata
+        """
+        from superset.sql.execution import SQLExecutor
+
+        return SQLExecutor(self).execute(sql, options)
+
+    def execute_async(
+        self,
+        sql: str,
+        options: Any | None = None,
+    ) -> Any:
+        """
+        Execute SQL asynchronously via Celery.
+
+        :param sql: SQL query to execute
+        :param options: QueryOptions with execution settings
+        :returns: AsyncQueryHandle for tracking the query
+        """
+        from superset.sql.execution import SQLExecutor
+
+        return SQLExecutor(self).execute_async(sql, options)

Review Comment:
   **Suggestion:** The two new methods duplicate the same import/instantiation 
logic; extract a single private helper that creates the SQLExecutor to avoid 
copy-paste and reduce future inconsistencies (minimal change: helper plus 
delegating wrappers). [code duplication]
   
   **Severity Level:** Minor ⚠️
   ```suggestion
       def _get_sql_executor(self):
           # local import to avoid potential circular imports at module import 
time
           from superset.sql.execution import SQLExecutor
           return SQLExecutor(self)
   
       def execute(
           self,
           sql: str,
           options: Any | None = None,
       ) -> Any:
           """
           Execute SQL synchronously.
   
           :param sql: SQL query to execute
           :param options: QueryOptions with execution settings
           :returns: QueryResult with status, data, and metadata
           """
           return self._get_sql_executor().execute(sql, options)
   
       def execute_async(
           self,
           sql: str,
           options: Any | None = None,
       ) -> Any:
           """
           Execute SQL asynchronously via Celery.
   
           :param sql: SQL query to execute
           :param options: QueryOptions with execution settings
           :returns: AsyncQueryHandle for tracking the query
           """
           return self._get_sql_executor().execute_async(sql, options)
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   Good, low-risk cleanup. Extracting a small helper to instantiate SQLExecutor 
reduces duplication and the chance of diverging behavior between the two 
wrappers. It's a purely internal refactor that doesn't change semantics and 
improves maintainability.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/models/core.py
   **Line:** 1279:1309
   **Comment:**
        *Code Duplication: The two new methods duplicate the same 
import/instantiation logic; extract a single private helper that creates the 
SQLExecutor to avoid copy-paste and reduce future inconsistencies (minimal 
change: helper plus delegating wrappers).
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/sql/execution/__init__.py:
##########
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from superset.sql.execution.executor import SQLExecutor

Review Comment:
   **Suggestion:** Using an absolute import of a sibling module (`from 
superset.sql.execution.executor import SQLExecutor`) inside a package 
`__init__` can fail in some packaging/top-level import contexts and is fragile 
to refactors; use a relative import to reliably import the sibling module. 
[possible bug]
   
   **Severity Level:** Critical 🚨
   ```suggestion
   from .executor import SQLExecutor
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   Using a relative import (from .executor import SQLExecutor) inside a package 
__init__ is the idiomatic, more robust choice — it avoids certain 
packaging/path resolution edge cases and is resilient to refactors of the 
top-level package name. The change is trivial, safe, and doesn't alter runtime 
behavior in normal cases.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/sql/execution/__init__.py
   **Line:** 18:18
   **Comment:**
        *Possible Bug: Using an absolute import of a sibling module (`from 
superset.sql.execution.executor import SQLExecutor`) inside a package 
`__init__` can fail in some packaging/top-level import contexts and is fragile 
to refactors; use a relative import to reliably import the sibling module.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/sql/execution/celery_task.py:
##########
@@ -0,0 +1,470 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Celery task for async SQL execution.
+
+This module provides the Celery task for executing SQL queries asynchronously.
+It is used by SQLExecutor.execute_async() to run queries in the background.
+"""
+
+from __future__ import annotations
+
+import dataclasses
+import logging
+import uuid
+from sys import getsizeof
+from typing import Any, TYPE_CHECKING
+
+import msgpack
+from celery.exceptions import SoftTimeLimitExceeded
+from flask import current_app as app, has_app_context
+from flask_babel import gettext as __
+
+from superset import (
+    db,
+    results_backend,
+    security_manager,
+)
+from superset.common.db_query_status import QueryStatus
+from superset.constants import QUERY_CANCEL_KEY
+from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import (
+    SupersetErrorException,
+    SupersetErrorsException,
+)
+from superset.extensions import celery_app
+from superset.models.sql_lab import Query
+from superset.result_set import SupersetResultSet
+from superset.sql.execution.executor import execute_sql_with_cursor
+from superset.sql.parse import SQLScript
+from superset.sqllab.utils import write_ipc_buffer
+from superset.utils import json
+from superset.utils.core import override_user, zlib_compress
+from superset.utils.dates import now_as_float
+from superset.utils.decorators import stats_timing
+
+if TYPE_CHECKING:
+    pass
+
+logger = logging.getLogger(__name__)
+
+BYTES_IN_MB = 1024 * 1024
+
+
+def _get_query(query_id: int) -> Query:
+    """Get the query by ID."""
+    return db.session.query(Query).filter_by(id=query_id).one()
+
+
+def _handle_query_error(
+    ex: Exception,
+    query: Query,
+    payload: dict[str, Any] | None = None,
+    prefix_message: str = "",
+) -> dict[str, Any]:
+    """Handle error while processing the SQL query."""
+    payload = payload or {}
+    msg = f"{prefix_message} {str(ex)}".strip()
+    query.error_message = msg
+    query.tmp_table_name = None
+    query.status = QueryStatus.FAILED
+
+    if not query.end_time:
+        query.end_time = now_as_float()
+
+    # Extract DB-specific errors
+    if isinstance(ex, SupersetErrorException):
+        errors = [ex.error]
+    elif isinstance(ex, SupersetErrorsException):
+        errors = ex.errors
+    else:
+        errors = query.database.db_engine_spec.extract_errors(
+            str(ex), database_name=query.database.unique_name
+        )
+
+    errors_payload = [dataclasses.asdict(error) for error in errors]
+    if errors:
+        query.set_extra_json_key("errors", errors_payload)
+
+    db.session.commit()  # pylint: disable=consider-using-transaction
+    payload.update({"status": query.status, "error": msg, "errors": 
errors_payload})
+    if troubleshooting_link := app.config.get("TROUBLESHOOTING_LINK"):
+        payload["link"] = troubleshooting_link
+    return payload
+
+
+def _serialize_payload(payload: dict[Any, Any]) -> bytes:
+    """Serialize payload for storage based on RESULTS_BACKEND_USE_MSGPACK 
config."""
+    from superset import results_backend_use_msgpack
+
+    if results_backend_use_msgpack:
+        return msgpack.dumps(payload, default=json.json_iso_dttm_ser, 
use_bin_type=True)
+    return json.dumps(payload, default=json.json_iso_dttm_ser, 
ignore_nan=True).encode(
+        "utf-8"
+    )
+
+
+def _prepare_statement_blocks(
+    rendered_query: str,
+    db_engine_spec: Any,
+) -> tuple[SQLScript, list[str]]:
+    """
+    Parse SQL and build statement blocks for execution.
+
+    Note: RLS, security checks, and other preprocessing are handled by
+    SQLExecutor before the query reaches this task.
+    """
+    parsed_script = SQLScript(rendered_query, engine=db_engine_spec.engine)
+
+    # Build statement blocks for execution
+    if db_engine_spec.run_multiple_statements_as_one:
+        blocks = 
[parsed_script.format(comments=db_engine_spec.allows_sql_comments)]
+    else:
+        blocks = [
+            statement.format(comments=db_engine_spec.allows_sql_comments)
+            for statement in parsed_script.statements
+        ]
+
+    return parsed_script, blocks
+
+
+def _finalize_successful_query(
+    query: Query,
+    execution_results: list[tuple[str, SupersetResultSet | None, float, int]],
+    payload: dict[str, Any],
+    total_execution_time_ms: float,
+) -> None:
+    """Update query metadata and payload after successful execution."""
+    # Calculate total rows across all statements
+    total_rows = 0
+    statements_data: list[dict[str, Any]] = []
+
+    for stmt_sql, result_set, exec_time, rowcount in execution_results:
+        if result_set is not None:
+            # SELECT statement
+            total_rows += result_set.size
+            data, columns = _serialize_result_set(result_set)
+            statements_data.append(
+                {
+                    "statement": stmt_sql,
+                    "data": data,
+                    "columns": columns,
+                    "row_count": result_set.size,
+                    "execution_time_ms": exec_time,
+                }
+            )
+        else:
+            # DML statement - no data, just row count
+            statements_data.append(
+                {
+                    "statement": stmt_sql,
+                    "data": None,
+                    "columns": [],
+                    "row_count": rowcount,
+                    "execution_time_ms": exec_time,
+                }
+            )
+
+    query.rows = total_rows
+    query.progress = 100
+    query.set_extra_json_key("progress", None)
+    # Store columns from last statement (for compatibility)
+    if execution_results and execution_results[-1][1] is not None:
+        query.set_extra_json_key("columns", execution_results[-1][1].columns)
+    query.end_time = now_as_float()
+
+    payload.update(
+        {
+            "status": QueryStatus.SUCCESS,
+            "statements": statements_data,
+            "total_execution_time_ms": total_execution_time_ms,
+            "query": query.to_dict(),
+        }
+    )
+    payload["query"]["state"] = QueryStatus.SUCCESS
+
+
+def _store_results_in_backend(
+    query: Query,
+    payload: dict[str, Any],
+    database: Any,
+) -> None:
+    """Store query results in the results backend."""
+    key = str(uuid.uuid4())
+    payload["query"]["resultsKey"] = key
+    logger.info(
+        "Query %s: Storing results in results backend, key: %s",
+        str(query.id),
+        key,
+    )
+    stats_logger = app.config["STATS_LOGGER"]
+    with stats_timing("sqllab.query.results_backend_write", stats_logger):
+        with stats_timing(
+            "sqllab.query.results_backend_write_serialization", stats_logger
+        ):
+            serialized_payload = _serialize_payload(payload)
+
+            # Check payload size limit
+            if sql_lab_payload_max_mb := 
app.config.get("SQLLAB_PAYLOAD_MAX_MB"):
+                serialized_payload_size = len(serialized_payload)
+                max_bytes = sql_lab_payload_max_mb * BYTES_IN_MB
+
+                if serialized_payload_size > max_bytes:
+                    logger.info("Result size exceeds the allowed limit.")
+                    raise SupersetErrorException(
+                        SupersetError(
+                            message=(
+                                f"Result size "
+                                f"({serialized_payload_size / BYTES_IN_MB:.2f} 
MB) "
+                                f"exceeds the allowed limit of "
+                                f"{sql_lab_payload_max_mb} MB."
+                            ),
+                            
error_type=SupersetErrorType.RESULT_TOO_LARGE_ERROR,
+                            level=ErrorLevel.ERROR,
+                        )
+                    )
+
+        cache_timeout = database.cache_timeout
+        if cache_timeout is None:
+            cache_timeout = app.config["CACHE_DEFAULT_TIMEOUT"]
+
+        compressed = zlib_compress(serialized_payload)
+        logger.debug("*** serialized payload size: %i", 
getsizeof(serialized_payload))
+        logger.debug("*** compressed payload size: %i", getsizeof(compressed))

Review Comment:
   **Suggestion:** Incorrect size measurement for payload/debug logs: using 
`sys.getsizeof` reports Python object size and not the actual byte length of 
the serialized payload/compressed bytes; use `len()` on the bytes objects for 
accurate size logs and comparisons. [possible bug]
   
   **Severity Level:** Critical 🚨
   ```suggestion
           logger.debug("*** serialized payload size: %i", 
len(serialized_payload))
           logger.debug("*** compressed payload size: %i", len(compressed))
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   For bytes objects you want the byte length (len(...)) to log and compare 
sizes accurately. getsizeof() returns interpreter-level object size including 
overhead and is misleading for payload size checks.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/sql/execution/celery_task.py
   **Line:** 245:247
   **Comment:**
        *Possible Bug: Incorrect size measurement for payload/debug logs: using 
`sys.getsizeof` reports Python object size and not the actual byte length of 
the serialized payload/compressed bytes; use `len()` on the bytes objects for 
accurate size logs and comparisons.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/sql/execution/executor.py:
##########
@@ -0,0 +1,1080 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+SQL Executor implementation for Database.execute() and execute_async().
+
+This module provides the SQLExecutor class that implements the query execution
+methods defined in superset_core.api.models.Database.
+
+Implementation Features
+-----------------------
+
+Query Preparation (applies to both sync and async):
+- Jinja2 template rendering (via template_params in QueryOptions)
+- SQL mutation via SQL_QUERY_MUTATOR config hook
+- DML permission checking (requires database.allow_dml=True for DML)
+- Disallowed functions checking via DISALLOWED_SQL_FUNCTIONS config
+- Row-level security (RLS) via AST transformation (always applied)
+- Result limit application via SQL_MAX_ROW config
+- Catalog/schema resolution and validation
+
+Synchronous Execution (execute):
+- Multi-statement SQL parsing and execution
+- Progress tracking via Query model
+- Result caching via cache_manager.data_cache
+- Query logging via QUERY_LOGGER config hook
+- Timeout protection via SQLLAB_TIMEOUT config
+- Dry run mode (returns transformed SQL without execution)
+
+Asynchronous Execution (execute_async):
+- Celery task submission for background execution
+- Security validation before submission
+- Query model creation with PENDING status
+- Result caching check (returns cached if available)
+- Background execution with timeout via SQLLAB_ASYNC_TIME_LIMIT_SEC
+- Results stored in results backend for retrieval
+- Handle-based progress tracking and cancellation
+
+See Database.execute() and Database.execute_async() docstrings in
+superset_core.api.models for the public API contract.
+"""
+
+from __future__ import annotations
+
+import logging
+import time
+from datetime import datetime
+from typing import Any, TYPE_CHECKING
+
+from flask import current_app as app, g, has_app_context
+
+from superset import db
+from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import (
+    SupersetSecurityException,
+    SupersetTimeoutException,
+)
+from superset.extensions import cache_manager
+from superset.sql.parse import SQLScript
+from superset.utils import core as utils
+
+if TYPE_CHECKING:
+    from superset_core.api.types import (
+        AsyncQueryHandle,
+        QueryOptions,
+        QueryResult,
+    )
+
+    from superset.models.core import Database
+    from superset.result_set import SupersetResultSet
+
+logger = logging.getLogger(__name__)
+
+
+def execute_sql_with_cursor(
+    database: Database,
+    cursor: Any,
+    statements: list[str],
+    query: Any,
+    log_query_fn: Any | None = None,
+    check_stopped_fn: Any | None = None,
+    execute_fn: Any | None = None,
+) -> list[tuple[str, SupersetResultSet | None, float, int]]:
+    """
+    Execute SQL statements with a cursor and return all result sets.
+
+    This is the shared execution logic used by both sync (SQLExecutor) and
+    async (celery_task) execution paths. It handles multi-statement execution
+    with progress tracking via the Query model.
+
+    :param database: Database model to execute against
+    :param cursor: Database cursor to use for execution
+    :param statements: List of SQL statements to execute
+    :param query: Query model for progress tracking
+    :param log_query_fn: Optional function to log queries, called as fn(sql, 
schema)
+    :param check_stopped_fn: Optional function to check if query was stopped.
+        Should return True if stopped. Used by async execution for 
cancellation.
+    :param execute_fn: Optional custom execute function. If not provided, uses
+        database.db_engine_spec.execute(cursor, sql, database). Custom function
+        should accept (cursor, sql) and handle execution.
+    :returns: List of (statement_sql, result_set, execution_time_ms, rowcount) 
tuples
+        Returns empty list if stopped. Raises exception on error (fail-fast).
+    """
+    from superset.result_set import SupersetResultSet
+
+    total = len(statements)
+    if total == 0:
+        return []
+
+    results: list[tuple[str, SupersetResultSet | None, float, int]] = []
+
+    for i, statement in enumerate(statements):
+        # Check if query was stopped (async cancellation)
+        if check_stopped_fn and check_stopped_fn():
+            return results
+
+        stmt_start_time = time.time()
+
+        # Apply SQL mutation
+        stmt_sql = database.mutate_sql_based_on_config(
+            statement,
+            is_split=True,
+        )
+
+        # Log query
+        if log_query_fn:
+            log_query_fn(stmt_sql, query.schema)
+
+        # Execute - use custom function or default
+        if execute_fn:
+            execute_fn(cursor, stmt_sql)
+        else:
+            database.db_engine_spec.execute(cursor, stmt_sql, database)
+
+        stmt_execution_time = (time.time() - stmt_start_time) * 1000
+
+        # Fetch results from ALL statements
+        description = cursor.description
+        if description:
+            rows = database.db_engine_spec.fetch_data(cursor)
+            result_set = SupersetResultSet(
+                rows,
+                description,
+                database.db_engine_spec,
+            )
+        else:
+            # DML statement - no result set
+            result_set = None
+
+        # Get row count for DML statements
+        rowcount = cursor.rowcount if hasattr(cursor, "rowcount") else 0
+
+        results.append((stmt_sql, result_set, stmt_execution_time, rowcount))
+
+        # Update progress on Query model
+        progress_pct = int(((i + 1) / total) * 100)
+        query.progress = progress_pct
+        query.set_extra_json_key(
+            "progress",
+            f"Running statement {i + 1} of {total}",
+        )
+        db.session.commit()  # pylint: disable=consider-using-transaction
+
+    return results
+
+
+class SQLExecutor:
+    """
+    SQL query executor implementation.
+
+    Implements Database.execute() and execute_async() methods.
+    See superset_core.api.models.Database for the full public API 
documentation.
+    """
+
+    def __init__(self, database: Database) -> None:
+        """
+        Initialize the executor with a database.
+
+        :param database: Database model instance to execute queries against
+        """
+        self.database = database
+
+    def execute(
+        self,
+        sql: str,
+        options: QueryOptions | None = None,
+    ) -> QueryResult:
+        """
+        Execute SQL synchronously.
+
+        If options.dry_run=True, returns the transformed SQL without execution.
+        All transformations (RLS, templates, limits) are still applied.
+
+        See superset_core.api.models.Database.execute() for full documentation.
+        """
+        from superset_core.api.types import (
+            QueryOptions as QueryOptionsType,
+            QueryResult as QueryResultType,
+            QueryStatus,
+            StatementResult,
+        )
+
+        opts: QueryOptionsType = options or QueryOptionsType()
+        start_time = time.time()
+
+        try:
+            # 1. Prepare SQL (assembly only, no security checks)
+            script, catalog, schema = self._prepare_sql(sql, opts)
+
+            # 2. Security checks
+            self._check_security(script)
+
+            # 3. Get mutation status and format SQL
+            has_mutation = script.has_mutation()
+            final_sql = script.format()
+
+            # DRY RUN: Return transformed SQL without execution
+            if opts.dry_run:
+                total_execution_time_ms = (time.time() - start_time) * 1000
+                # Create a StatementResult for each statement in dry-run mode
+                dry_run_statements = [
+                    StatementResult(
+                        statement=stmt.format(),
+                        data=None,
+                        row_count=0,
+                        execution_time_ms=0,
+                    )
+                    for stmt in script.statements
+                ]
+                return QueryResultType(
+                    status=QueryStatus.SUCCESS,
+                    statements=dry_run_statements,
+                    query_id=None,
+                    total_execution_time_ms=total_execution_time_ms,
+                    is_cached=False,
+                )
+
+            # 4. Check cache
+            cached_result = self._try_get_cached_result(has_mutation, 
final_sql, opts)
+            if cached_result:
+                return cached_result
+
+            # 5. Create Query model for audit
+            query = self._create_query_record(
+                final_sql, opts, catalog, schema, status="running"
+            )
+
+            # 6. Execute with timeout
+            timeout = opts.timeout_seconds or app.config.get("SQLLAB_TIMEOUT", 
30)
+            timeout_msg = f"Query exceeded the {timeout} seconds timeout."
+
+            with utils.timeout(seconds=timeout, error_message=timeout_msg):
+                statement_results = self._execute_statements(
+                    final_sql,
+                    catalog,
+                    schema,
+                    query,
+                )
+
+            total_execution_time_ms = (time.time() - start_time) * 1000
+
+            # Calculate total row count for Query model
+            total_rows = sum(stmt.row_count for stmt in statement_results)
+
+            # Update query record
+            query.status = "success"
+            query.rows = total_rows
+            query.progress = 100
+            db.session.commit()  # pylint: disable=consider-using-transaction
+
+            result = QueryResultType(
+                status=QueryStatus.SUCCESS,
+                statements=statement_results,
+                query_id=query.id,
+                total_execution_time_ms=total_execution_time_ms,
+            )
+
+            # Store in cache (if SELECT and caching enabled)
+            if not has_mutation:
+                self._store_in_cache(result, final_sql, opts)
+
+            return result
+
+        except SupersetTimeoutException:
+            return self._create_error_result(
+                QueryStatus.TIMED_OUT,
+                "Query exceeded the timeout limit",
+                sql,
+                start_time,
+            )
+        except SupersetSecurityException as ex:
+            return self._create_error_result(
+                QueryStatus.FAILED, str(ex), sql, start_time
+            )
+        except Exception as ex:
+            error_msg = self.database.db_engine_spec.extract_error_message(ex)
+            return self._create_error_result(
+                QueryStatus.FAILED, error_msg, sql, start_time
+            )
+
+    def execute_async(
+        self,
+        sql: str,
+        options: QueryOptions | None = None,
+    ) -> AsyncQueryHandle:
+        """
+        Execute SQL asynchronously via Celery.
+
+        If options.dry_run=True, returns the transformed SQL as a completed
+        AsyncQueryHandle without submitting to Celery.
+
+        See superset_core.api.models.Database.execute_async() for full 
documentation.
+        """
+        from superset_core.api.types import (
+            QueryOptions as QueryOptionsType,
+            QueryResult as QueryResultType,
+            QueryStatus,
+        )
+
+        opts: QueryOptionsType = options or QueryOptionsType()
+
+        # 1. Prepare SQL (assembly only, no security checks)
+        script, catalog, schema = self._prepare_sql(sql, opts)
+
+        # 2. Security checks
+        self._check_security(script)
+
+        # 3. Get mutation status and format SQL
+        has_mutation = script.has_mutation()
+        final_sql = script.format()
+
+        # DRY RUN: Return transformed SQL as completed async handle
+        if opts.dry_run:
+            from superset_core.api.types import StatementResult
+
+            dry_run_statements = [
+                StatementResult(
+                    statement=stmt.format(),
+                    data=None,
+                    row_count=0,
+                    execution_time_ms=0,
+                )
+                for stmt in script.statements
+            ]
+            dry_run_result = QueryResultType(
+                status=QueryStatus.SUCCESS,
+                statements=dry_run_statements,
+                query_id=None,
+                total_execution_time_ms=0,
+                is_cached=False,
+            )
+            return self._create_cached_handle(dry_run_result)
+
+        # 4. Check cache
+        if cached_result := self._try_get_cached_result(has_mutation, 
final_sql, opts):
+            return self._create_cached_handle(cached_result)
+
+        # 5. Create Query model for audit
+        query = self._create_query_record(
+            final_sql, opts, catalog, schema, status="pending"
+        )
+
+        # 6. Submit to Celery
+        self._submit_query_to_celery(query, final_sql, opts)
+
+        # 7. Create and return handle with bound methods
+        return self._create_async_handle(query.id)
+
+    def _prepare_sql(
+        self,
+        sql: str,
+        opts: QueryOptions,
+    ) -> tuple[SQLScript, str | None, str | None]:
+        """
+        Prepare SQL for execution (no side effects, no security checks).
+
+        This method performs SQL preprocessing:
+        1. Template rendering
+        2. SQL parsing
+        3. Catalog/schema resolution
+        4. RLS application
+        5. Limit application (if not mutation)
+
+        Security checks (disallowed functions, DML permission) are performed
+        by the caller after receiving the prepared script.
+
+        :param sql: Original SQL query
+        :param opts: Query options
+        :returns: Tuple of (prepared SQLScript, catalog, schema)
+        """
+        # 1. Render Jinja2 templates
+        rendered_sql = self._render_sql_template(sql, opts.template_params)
+
+        # 2. Parse SQL with SQLScript
+        script = SQLScript(rendered_sql, self.database.db_engine_spec.engine)
+
+        # 3. Get catalog and schema
+        catalog = opts.catalog or self.database.get_default_catalog()
+        schema = opts.schema or self.database.get_default_schema(catalog)
+
+        # 4. Apply RLS directly to script statements
+        self._apply_rls_to_script(script, catalog, schema)
+
+        # 5. Apply limit only if not a mutation
+        if not script.has_mutation():
+            self._apply_limit_to_script(script, opts)
+
+        return script, catalog, schema
+
+    def _check_security(self, script: SQLScript) -> None:
+        """
+        Perform security checks on prepared SQL script.
+
+        :param script: Prepared SQLScript
+        :raises SupersetSecurityException: If security checks fail
+        """
+        # Check disallowed functions
+        if disallowed := self._check_disallowed_functions(script):
+            raise SupersetSecurityException(
+                SupersetError(
+                    message=f"Disallowed SQL functions: {', 
'.join(disallowed)}",
+                    error_type=SupersetErrorType.INVALID_SQL_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            )
+
+        # Check DML permission
+        if script.has_mutation() and not self.database.allow_dml:
+            raise SupersetSecurityException(
+                SupersetError(
+                    message="DML queries are not allowed on this database",
+                    error_type=SupersetErrorType.DML_NOT_ALLOWED_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            )
+
+    def _execute_statements(
+        self,
+        sql: str,
+        catalog: str | None,
+        schema: str | None,
+        query: Any,
+    ) -> list[Any]:
+        """
+        Execute SQL statements and return per-statement results.
+
+        Progress is tracked via Query.progress field.
+        Uses the same execution path for both single and multi-statement 
queries.
+
+        :param sql: Final SQL to execute (with RLS and all transformations 
applied)
+        :param catalog: Catalog name
+        :param schema: Schema name
+        :param query: Query model for progress tracking
+        :returns: List of StatementResult objects
+        """
+        from superset_core.api.types import StatementResult
+
+        # Parse the final SQL (with RLS applied) to get statements
+        script = SQLScript(sql, self.database.db_engine_spec.engine)
+        statements = script.statements
+
+        # Handle empty script
+        if not statements:
+            return []
+
+        results_list = []
+
+        # Use consistent execution path for all queries
+        with self.database.get_raw_connection(catalog=catalog, schema=schema) 
as conn:
+            cursor = conn.cursor()
+

Review Comment:
   **Suggestion:** Cursor is created via `cursor = conn.cursor()` but never 
guaranteed to be closed; this can leak DB-API cursors for some drivers. Use a 
context manager (or contextlib.closing) to ensure the cursor is closed even on 
errors. [resource leak]
   
   **Severity Level:** Minor ⚠️
   ```suggestion
               from contextlib import closing
               with closing(conn.cursor()) as cursor:
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   Good, accurate suggestion — the cursor is created but not explicitly closed. 
Wrapping conn.cursor() with contextlib.closing (or using the DB-API's context 
manager if available) prevents a potential resource leak for drivers that don't 
auto-close cursors when the connection context exits. The improved snippet is 
syntactically correct and applies locally.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/sql/execution/executor.py
   **Line:** 484:485
   **Comment:**
        *Resource Leak: Cursor is created via `cursor = conn.cursor()` but 
never guaranteed to be closed; this can leak DB-API cursors for some drivers. 
Use a context manager (or contextlib.closing) to ensure the cursor is closed 
even on errors.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/sql/execution/celery_task.py:
##########
@@ -0,0 +1,470 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Celery task for async SQL execution.
+
+This module provides the Celery task for executing SQL queries asynchronously.
+It is used by SQLExecutor.execute_async() to run queries in the background.
+"""
+
+from __future__ import annotations
+
+import dataclasses
+import logging
+import uuid
+from sys import getsizeof
+from typing import Any, TYPE_CHECKING
+
+import msgpack
+from celery.exceptions import SoftTimeLimitExceeded
+from flask import current_app as app, has_app_context
+from flask_babel import gettext as __
+
+from superset import (
+    db,
+    results_backend,
+    security_manager,
+)
+from superset.common.db_query_status import QueryStatus
+from superset.constants import QUERY_CANCEL_KEY
+from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import (
+    SupersetErrorException,
+    SupersetErrorsException,
+)
+from superset.extensions import celery_app
+from superset.models.sql_lab import Query
+from superset.result_set import SupersetResultSet
+from superset.sql.execution.executor import execute_sql_with_cursor
+from superset.sql.parse import SQLScript
+from superset.sqllab.utils import write_ipc_buffer
+from superset.utils import json
+from superset.utils.core import override_user, zlib_compress
+from superset.utils.dates import now_as_float
+from superset.utils.decorators import stats_timing
+
+if TYPE_CHECKING:
+    pass
+
+logger = logging.getLogger(__name__)
+
+BYTES_IN_MB = 1024 * 1024
+
+
+def _get_query(query_id: int) -> Query:
+    """Get the query by ID."""
+    return db.session.query(Query).filter_by(id=query_id).one()
+
+
+def _handle_query_error(
+    ex: Exception,
+    query: Query,
+    payload: dict[str, Any] | None = None,
+    prefix_message: str = "",
+) -> dict[str, Any]:
+    """Handle error while processing the SQL query."""
+    payload = payload or {}
+    msg = f"{prefix_message} {str(ex)}".strip()
+    query.error_message = msg
+    query.tmp_table_name = None
+    query.status = QueryStatus.FAILED
+
+    if not query.end_time:
+        query.end_time = now_as_float()
+
+    # Extract DB-specific errors
+    if isinstance(ex, SupersetErrorException):
+        errors = [ex.error]
+    elif isinstance(ex, SupersetErrorsException):
+        errors = ex.errors
+    else:
+        errors = query.database.db_engine_spec.extract_errors(
+            str(ex), database_name=query.database.unique_name
+        )
+
+    errors_payload = [dataclasses.asdict(error) for error in errors]
+    if errors:
+        query.set_extra_json_key("errors", errors_payload)
+
+    db.session.commit()  # pylint: disable=consider-using-transaction
+    payload.update({"status": query.status, "error": msg, "errors": 
errors_payload})
+    if troubleshooting_link := app.config.get("TROUBLESHOOTING_LINK"):
+        payload["link"] = troubleshooting_link
+    return payload
+
+
+def _serialize_payload(payload: dict[Any, Any]) -> bytes:
+    """Serialize payload for storage based on RESULTS_BACKEND_USE_MSGPACK 
config."""
+    from superset import results_backend_use_msgpack
+
+    if results_backend_use_msgpack:
+        return msgpack.dumps(payload, default=json.json_iso_dttm_ser, 
use_bin_type=True)
+    return json.dumps(payload, default=json.json_iso_dttm_ser, 
ignore_nan=True).encode(
+        "utf-8"
+    )
+
+
+def _prepare_statement_blocks(
+    rendered_query: str,
+    db_engine_spec: Any,
+) -> tuple[SQLScript, list[str]]:
+    """
+    Parse SQL and build statement blocks for execution.
+
+    Note: RLS, security checks, and other preprocessing are handled by
+    SQLExecutor before the query reaches this task.
+    """
+    parsed_script = SQLScript(rendered_query, engine=db_engine_spec.engine)
+
+    # Build statement blocks for execution
+    if db_engine_spec.run_multiple_statements_as_one:
+        blocks = 
[parsed_script.format(comments=db_engine_spec.allows_sql_comments)]
+    else:
+        blocks = [
+            statement.format(comments=db_engine_spec.allows_sql_comments)
+            for statement in parsed_script.statements
+        ]
+
+    return parsed_script, blocks
+
+
+def _finalize_successful_query(
+    query: Query,
+    execution_results: list[tuple[str, SupersetResultSet | None, float, int]],
+    payload: dict[str, Any],
+    total_execution_time_ms: float,
+) -> None:
+    """Update query metadata and payload after successful execution."""
+    # Calculate total rows across all statements
+    total_rows = 0
+    statements_data: list[dict[str, Any]] = []
+
+    for stmt_sql, result_set, exec_time, rowcount in execution_results:
+        if result_set is not None:
+            # SELECT statement
+            total_rows += result_set.size
+            data, columns = _serialize_result_set(result_set)
+            statements_data.append(
+                {
+                    "statement": stmt_sql,
+                    "data": data,
+                    "columns": columns,
+                    "row_count": result_set.size,
+                    "execution_time_ms": exec_time,
+                }
+            )
+        else:
+            # DML statement - no data, just row count
+            statements_data.append(
+                {
+                    "statement": stmt_sql,
+                    "data": None,
+                    "columns": [],
+                    "row_count": rowcount,
+                    "execution_time_ms": exec_time,
+                }
+            )
+
+    query.rows = total_rows
+    query.progress = 100
+    query.set_extra_json_key("progress", None)
+    # Store columns from last statement (for compatibility)
+    if execution_results and execution_results[-1][1] is not None:
+        query.set_extra_json_key("columns", execution_results[-1][1].columns)
+    query.end_time = now_as_float()
+
+    payload.update(
+        {
+            "status": QueryStatus.SUCCESS,
+            "statements": statements_data,
+            "total_execution_time_ms": total_execution_time_ms,
+            "query": query.to_dict(),
+        }
+    )
+    payload["query"]["state"] = QueryStatus.SUCCESS

Review Comment:
   **Suggestion:** Serialization bug: payload fields are assigned Enum members 
(`QueryStatus`) directly which may not be JSON/msgpack serializable; convert 
enum members to a primitive (use `.value` or `.name`) before placing them into 
the payload to avoid TypeError during serialization. [possible bug]
   
   **Severity Level:** Critical 🚨
   ```suggestion
               "status": QueryStatus.SUCCESS.value,
               "statements": statements_data,
               "total_execution_time_ms": total_execution_time_ms,
               "query": query.to_dict(),
           }
       )
       # Store a primitive value for the query state so the payload is 
serializable
       payload["query"]["state"] = QueryStatus.SUCCESS.value
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   Putting Enum objects into a payload destined for msgpack/json serialization 
often fails unless there's an explicit encoder handling enums. Converting to a 
primitive (.value or .name) prevents serialization issues and avoids runtime 
errors when writing results to the backend.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/sql/execution/celery_task.py
   **Line:** 192:198
   **Comment:**
        *Possible Bug: Serialization bug: payload fields are assigned Enum 
members (`QueryStatus`) directly which may not be JSON/msgpack serializable; 
convert enum members to a primitive (use `.value` or `.name`) before placing 
them into the payload to avoid TypeError during serialization.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/unit_tests/sql/execution/conftest.py:
##########
@@ -0,0 +1,315 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Shared fixtures and helpers for SQL execution tests.
+
+This module provides common mocks, fixtures, and helper functions used across
+test_celery_task.py and test_executor.py to reduce code duplication.
+"""
+
+from contextlib import contextmanager
+from typing import Any
+from unittest.mock import MagicMock
+
+import pandas as pd
+import pytest
+from flask import current_app
+from pytest_mock import MockerFixture
+
+from superset.common.db_query_status import QueryStatus as QueryStatusEnum
+from superset.models.core import Database
+
+# =============================================================================
+# Core Fixtures
+# =============================================================================
+
+
[email protected](autouse=True)
+def mock_db_session(mocker: MockerFixture) -> MagicMock:
+    """Mock database session for all tests to avoid foreign key constraints."""
+    mock_session = MagicMock()
+    mocker.patch("superset.sql.execution.executor.db.session", mock_session)
+    mocker.patch("superset.sql.execution.celery_task.db.session", mock_session)
+    return mock_session
+
+
[email protected]
+def mock_query() -> MagicMock:
+    """Create a mock Query model."""
+    query = MagicMock()
+    query.id = 123
+    query.database_id = 1
+    query.sql = "SELECT * FROM users"
+    query.status = QueryStatusEnum.PENDING
+    query.error_message = None
+    query.progress = 0
+    query.end_time = None
+    query.start_running_time = None
+    query.executed_sql = None
+    query.tmp_table_name = None
+    query.catalog = None
+    query.schema = "public"
+    query.extra = {}
+    query.set_extra_json_key = MagicMock()
+    query.results_key = None
+    query.select_as_cta = False
+    query.rows = 0
+    query.to_dict = MagicMock(return_value={"id": 123})
+    query.database = MagicMock()
+    query.database.db_engine_spec.extract_errors.return_value = []
+    query.database.unique_name = "test_db"
+    query.database.cache_timeout = 300
+    return query
+
+
[email protected]
+def mock_database() -> MagicMock:
+    """Create a mock Database."""
+    database = MagicMock()
+    database.id = 1
+    database.unique_name = "test_db"
+    database.cache_timeout = 300
+    database.sqlalchemy_uri = "postgresql://localhost/test"
+    database.db_engine_spec = MagicMock()
+    database.db_engine_spec.engine = "postgresql"
+    database.db_engine_spec.run_multiple_statements_as_one = False
+    database.db_engine_spec.allows_sql_comments = True
+    database.db_engine_spec.extract_errors = MagicMock(return_value=[])
+    database.db_engine_spec.execute_with_cursor = MagicMock()
+    database.db_engine_spec.get_cancel_query_id = MagicMock(return_value=None)
+    database.db_engine_spec.patch = MagicMock()
+    database.db_engine_spec.fetch_data = MagicMock(return_value=[])
+    return database
+
+
[email protected]
+def mock_result_set() -> MagicMock:
+    """Create a mock SupersetResultSet."""
+    result_set = MagicMock()
+    result_set.size = 2
+    result_set.columns = [{"name": "id"}, {"name": "name"}]
+    result_set.pa_table = MagicMock()
+    result_set.to_pandas_df = MagicMock(
+        return_value=pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
+    )
+    return result_set
+
+
[email protected]
+def database() -> Database:
+    """Create a real test database instance."""
+    return Database(
+        id=1,
+        database_name="test_db",
+        sqlalchemy_uri="sqlite://",
+        allow_dml=False,
+    )
+
+
[email protected]
+def database_with_dml() -> Database:
+    """Create a real test database instance with DML allowed."""
+    return Database(
+        id=2,
+        database_name="test_db_dml",
+        sqlalchemy_uri="sqlite://",
+        allow_dml=True,
+    )
+
+
+# =============================================================================
+# Helper Functions for Mock Creation
+# =============================================================================
+
+
+def create_mock_cursor(
+    column_names: list[str],
+    data: list[tuple[Any, ...]] | None = None,
+) -> MagicMock:
+    """
+    Create a mock database cursor with column description.
+
+    Args:
+        column_names: List of column names
+        data: Optional data to return from fetchall()
+
+    Returns:
+        Configured MagicMock cursor
+    """
+    mock_cursor = MagicMock()
+    mock_cursor.description = [(name,) for name in column_names]
+    if data is not None:
+        mock_cursor.fetchall.return_value = data
+    return mock_cursor
+
+
+def create_mock_connection(mock_cursor: MagicMock | None = None) -> MagicMock:
+    """
+    Create a mock database connection.
+
+    Args:
+        mock_cursor: Optional cursor to return from cursor()
+
+    Returns:
+        Configured MagicMock connection with context manager support
+    """
+    if mock_cursor is None:
+        mock_cursor = create_mock_cursor([])
+
+    mock_conn = MagicMock()
+    mock_conn.cursor.return_value = mock_cursor
+    mock_conn.close = MagicMock()
+    mock_conn.__enter__ = MagicMock(return_value=mock_conn)
+    mock_conn.__exit__ = MagicMock(return_value=False)
+    return mock_conn
+
+
+def setup_mock_raw_connection(
+    mock_database: MagicMock,
+    mock_connection: MagicMock | None = None,
+) -> MagicMock:
+    """
+    Setup get_raw_connection as a context manager on a mock database.
+
+    Args:
+        mock_database: The database mock to configure
+        mock_connection: Optional connection to yield
+
+    Returns:
+        The configured mock connection
+    """
+    if mock_connection is None:
+        mock_connection = create_mock_connection()
+
+    @contextmanager
+    def _raw_connection(
+        catalog: str | None = None,
+        schema: str | None = None,
+        nullpool: bool = True,
+        source: Any | None = None,
+    ):
+        yield mock_connection
+
+    mock_database.get_raw_connection = _raw_connection
+    return mock_connection
+
+
+def setup_db_session_query_mock(
+    mock_db_session: MagicMock,
+    return_value: Any = None,
+) -> None:
+    """
+    Setup database session query chain for query lookup.
+
+    Args:
+        mock_db_session: The database session mock
+        return_value: Value to return from one_or_none()
+    """
+    filter_mock = mock_db_session.query.return_value.filter_by.return_value
+    filter_mock.one_or_none.return_value = return_value
+
+
+def mock_query_execution(
+    mocker: MockerFixture,
+    database: Database,
+    return_data: list[tuple[Any, ...]],
+    column_names: list[str],
+) -> None:
+    """
+    Mock the raw connection execution path for testing.
+
+    This helper sets up all necessary mocks for executing a query through
+    the database engine spec and returning results.
+
+    Args:
+        mocker: pytest-mock fixture
+        database: Database instance to mock
+        return_data: Data to return from fetch_data, e.g. [(1, "Alice"), (2, 
"Bob")]
+        column_names: Column names for the result, e.g. ["id", "name"]
+    """
+    from superset.result_set import SupersetResultSet
+
+    # Mock cursor and connection
+    mock_cursor = create_mock_cursor(column_names, return_data)
+    mock_conn = create_mock_connection(mock_cursor)
+
+    mocker.patch.object(database, "get_raw_connection", return_value=mock_conn)
+    mocker.patch.object(
+        database, "mutate_sql_based_on_config", side_effect=lambda sql, **kw: 
sql
+    )
+    mocker.patch.object(database.db_engine_spec, "execute")
+    mocker.patch.object(database.db_engine_spec, "fetch_data", 
return_value=return_data)
+
+    # Create a real SupersetResultSet that converts to DataFrame properly
+    mock_result_set = MagicMock(spec=SupersetResultSet)
+    mock_result_set.to_pandas_df.return_value = pd.DataFrame(
+        return_data, columns=column_names
+    )
+    mocker.patch("superset.result_set.SupersetResultSet", 
return_value=mock_result_set)
+
+
+# =============================================================================
+# Composite Fixtures for Common Test Patterns
+# =============================================================================
+
+
[email protected]
+def default_sql_config(mocker: MockerFixture) -> None:
+    """Patch app config with default SQL execution settings."""
+    mocker.patch.dict(
+        current_app.config,

Review Comment:
   **Suggestion:** Using `current_app.config` in the fixture can raise a 
RuntimeError when the fixture is used without an active Flask application 
context; import the actual `app` object and patch `app.config` or ensure an app 
context is present to avoid "Working outside of application context" errors. 
[possible bug]
   
   **Severity Level:** Critical 🚨
   ```suggestion
       from superset import app
       mocker.patch.dict(
           app.config,
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   The suggestion is reasonable: referencing `current_app` requires an active 
app context and can raise "working outside of application context" if the 
fixture runs without one. Patching the real `app.config` (importing the Flask 
app) avoids that runtime error. It's a behavioral fix, not just stylistic.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/unit_tests/sql/execution/conftest.py
   **Line:** 274:275
   **Comment:**
        *Possible Bug: Using `current_app.config` in the fixture can raise a 
RuntimeError when the fixture is used without an active Flask application 
context; import the actual `app` object and patch `app.config` or ensure an app 
context is present to avoid "Working outside of application context" errors.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/sql/execution/celery_task.py:
##########
@@ -0,0 +1,470 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Celery task for async SQL execution.
+
+This module provides the Celery task for executing SQL queries asynchronously.
+It is used by SQLExecutor.execute_async() to run queries in the background.
+"""
+
+from __future__ import annotations
+
+import dataclasses
+import logging
+import uuid
+from sys import getsizeof
+from typing import Any, TYPE_CHECKING
+
+import msgpack
+from celery.exceptions import SoftTimeLimitExceeded
+from flask import current_app as app, has_app_context
+from flask_babel import gettext as __
+
+from superset import (
+    db,
+    results_backend,
+    security_manager,
+)
+from superset.common.db_query_status import QueryStatus
+from superset.constants import QUERY_CANCEL_KEY
+from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import (
+    SupersetErrorException,
+    SupersetErrorsException,
+)
+from superset.extensions import celery_app
+from superset.models.sql_lab import Query
+from superset.result_set import SupersetResultSet
+from superset.sql.execution.executor import execute_sql_with_cursor
+from superset.sql.parse import SQLScript
+from superset.sqllab.utils import write_ipc_buffer
+from superset.utils import json
+from superset.utils.core import override_user, zlib_compress
+from superset.utils.dates import now_as_float
+from superset.utils.decorators import stats_timing
+
+if TYPE_CHECKING:
+    pass
+
+logger = logging.getLogger(__name__)
+
+BYTES_IN_MB = 1024 * 1024
+
+
+def _get_query(query_id: int) -> Query:
+    """Get the query by ID."""
+    return db.session.query(Query).filter_by(id=query_id).one()
+
+
+def _handle_query_error(
+    ex: Exception,
+    query: Query,
+    payload: dict[str, Any] | None = None,
+    prefix_message: str = "",
+) -> dict[str, Any]:
+    """Handle error while processing the SQL query."""
+    payload = payload or {}
+    msg = f"{prefix_message} {str(ex)}".strip()
+    query.error_message = msg
+    query.tmp_table_name = None
+    query.status = QueryStatus.FAILED
+
+    if not query.end_time:
+        query.end_time = now_as_float()
+
+    # Extract DB-specific errors
+    if isinstance(ex, SupersetErrorException):
+        errors = [ex.error]
+    elif isinstance(ex, SupersetErrorsException):
+        errors = ex.errors
+    else:
+        errors = query.database.db_engine_spec.extract_errors(
+            str(ex), database_name=query.database.unique_name
+        )
+
+    errors_payload = [dataclasses.asdict(error) for error in errors]
+    if errors:
+        query.set_extra_json_key("errors", errors_payload)
+
+    db.session.commit()  # pylint: disable=consider-using-transaction
+    payload.update({"status": query.status, "error": msg, "errors": 
errors_payload})
+    if troubleshooting_link := app.config.get("TROUBLESHOOTING_LINK"):
+        payload["link"] = troubleshooting_link
+    return payload
+
+
+def _serialize_payload(payload: dict[Any, Any]) -> bytes:
+    """Serialize payload for storage based on RESULTS_BACKEND_USE_MSGPACK 
config."""
+    from superset import results_backend_use_msgpack
+
+    if results_backend_use_msgpack:
+        return msgpack.dumps(payload, default=json.json_iso_dttm_ser, 
use_bin_type=True)
+    return json.dumps(payload, default=json.json_iso_dttm_ser, 
ignore_nan=True).encode(
+        "utf-8"
+    )
+
+
+def _prepare_statement_blocks(
+    rendered_query: str,
+    db_engine_spec: Any,
+) -> tuple[SQLScript, list[str]]:
+    """
+    Parse SQL and build statement blocks for execution.
+
+    Note: RLS, security checks, and other preprocessing are handled by
+    SQLExecutor before the query reaches this task.
+    """
+    parsed_script = SQLScript(rendered_query, engine=db_engine_spec.engine)
+
+    # Build statement blocks for execution
+    if db_engine_spec.run_multiple_statements_as_one:
+        blocks = 
[parsed_script.format(comments=db_engine_spec.allows_sql_comments)]
+    else:
+        blocks = [
+            statement.format(comments=db_engine_spec.allows_sql_comments)
+            for statement in parsed_script.statements
+        ]
+
+    return parsed_script, blocks
+
+
+def _finalize_successful_query(
+    query: Query,
+    execution_results: list[tuple[str, SupersetResultSet | None, float, int]],
+    payload: dict[str, Any],
+    total_execution_time_ms: float,
+) -> None:
+    """Update query metadata and payload after successful execution."""
+    # Calculate total rows across all statements
+    total_rows = 0
+    statements_data: list[dict[str, Any]] = []
+
+    for stmt_sql, result_set, exec_time, rowcount in execution_results:
+        if result_set is not None:
+            # SELECT statement
+            total_rows += result_set.size
+            data, columns = _serialize_result_set(result_set)
+            statements_data.append(
+                {
+                    "statement": stmt_sql,
+                    "data": data,
+                    "columns": columns,
+                    "row_count": result_set.size,
+                    "execution_time_ms": exec_time,
+                }
+            )
+        else:
+            # DML statement - no data, just row count
+            statements_data.append(
+                {
+                    "statement": stmt_sql,
+                    "data": None,
+                    "columns": [],
+                    "row_count": rowcount,
+                    "execution_time_ms": exec_time,
+                }
+            )
+
+    query.rows = total_rows
+    query.progress = 100
+    query.set_extra_json_key("progress", None)
+    # Store columns from last statement (for compatibility)
+    if execution_results and execution_results[-1][1] is not None:
+        query.set_extra_json_key("columns", execution_results[-1][1].columns)
+    query.end_time = now_as_float()
+
+    payload.update(
+        {
+            "status": QueryStatus.SUCCESS,
+            "statements": statements_data,
+            "total_execution_time_ms": total_execution_time_ms,
+            "query": query.to_dict(),
+        }
+    )
+    payload["query"]["state"] = QueryStatus.SUCCESS
+
+
+def _store_results_in_backend(
+    query: Query,
+    payload: dict[str, Any],
+    database: Any,
+) -> None:
+    """Store query results in the results backend."""
+    key = str(uuid.uuid4())
+    payload["query"]["resultsKey"] = key
+    logger.info(
+        "Query %s: Storing results in results backend, key: %s",
+        str(query.id),
+        key,
+    )
+    stats_logger = app.config["STATS_LOGGER"]
+    with stats_timing("sqllab.query.results_backend_write", stats_logger):
+        with stats_timing(
+            "sqllab.query.results_backend_write_serialization", stats_logger
+        ):
+            serialized_payload = _serialize_payload(payload)
+
+            # Check payload size limit
+            if sql_lab_payload_max_mb := 
app.config.get("SQLLAB_PAYLOAD_MAX_MB"):
+                serialized_payload_size = len(serialized_payload)
+                max_bytes = sql_lab_payload_max_mb * BYTES_IN_MB
+
+                if serialized_payload_size > max_bytes:
+                    logger.info("Result size exceeds the allowed limit.")
+                    raise SupersetErrorException(
+                        SupersetError(
+                            message=(
+                                f"Result size "
+                                f"({serialized_payload_size / BYTES_IN_MB:.2f} 
MB) "
+                                f"exceeds the allowed limit of "
+                                f"{sql_lab_payload_max_mb} MB."
+                            ),
+                            
error_type=SupersetErrorType.RESULT_TOO_LARGE_ERROR,
+                            level=ErrorLevel.ERROR,
+                        )
+                    )
+
+        cache_timeout = database.cache_timeout
+        if cache_timeout is None:
+            cache_timeout = app.config["CACHE_DEFAULT_TIMEOUT"]
+
+        compressed = zlib_compress(serialized_payload)
+        logger.debug("*** serialized payload size: %i", 
getsizeof(serialized_payload))
+        logger.debug("*** compressed payload size: %i", getsizeof(compressed))
+
+        write_success = results_backend.set(key, compressed, cache_timeout)
+        if not write_success:
+            logger.error(
+                "Query %s: Failed to store results in backend, key: %s",
+                str(query.id),
+                key,
+            )
+            stats_logger.incr("sqllab.results_backend.write_failure")
+            query.results_key = None
+            query.status = QueryStatus.FAILED
+            query.error_message = (
+                "Failed to store query results in the results backend. "
+                "Please try again or contact your administrator."
+            )
+            db.session.commit()  # pylint: disable=consider-using-transaction
+            raise SupersetErrorException(
+                SupersetError(
+                    message=__("Failed to store query results. Please try 
again."),
+                    error_type=SupersetErrorType.RESULTS_BACKEND_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            )
+        else:
+            query.results_key = key
+            logger.info(
+                "Query %s: Successfully stored results in backend, key: %s",
+                str(query.id),
+                key,
+            )
+
+
+def _serialize_result_set(
+    result_set: SupersetResultSet,
+) -> tuple[bytes | list[Any], list[Any]]:
+    """
+    Serialize result set based on RESULTS_BACKEND_USE_MSGPACK config.
+
+    When msgpack is enabled, uses Apache Arrow IPC format for efficiency.
+    Otherwise, falls back to JSON-serializable records.
+
+    :param result_set: Query result set to serialize
+    :returns: Tuple of (serialized_data, columns)
+    """
+    from superset import results_backend_use_msgpack
+    from superset.dataframe import df_to_records
+
+    if results_backend_use_msgpack:
+        if has_app_context():
+            stats_logger = app.config["STATS_LOGGER"]
+            with stats_timing(
+                "sqllab.query.results_backend_pa_serialization", stats_logger
+            ):
+                data: bytes | list[Any] = write_ipc_buffer(
+                    result_set.pa_table
+                ).to_pybytes()
+        else:
+            data = write_ipc_buffer(result_set.pa_table).to_pybytes()
+    else:
+        df = result_set.to_pandas_df()
+        data = df_to_records(df) or []
+
+    return (data, result_set.columns)
+
+
+@celery_app.task(name="query_execution.execute_sql")
+def execute_sql_task(
+    query_id: int,
+    rendered_query: str,
+    username: str | None = None,
+    start_time: float | None = None,
+) -> dict[str, Any] | None:
+    """
+    Execute SQL query asynchronously via Celery.
+
+    This task is used by SQLExecutor.execute_async() to run queries
+    in background workers with full feature support.
+
+    :param query_id: ID of the Query model
+    :param rendered_query: Pre-rendered SQL query to execute
+    :param username: Username for context override
+    :param start_time: Query start time for timing metrics
+    :returns: Query result payload or None
+    """
+    with app.test_request_context():
+        with override_user(security_manager.find_user(username)):
+            try:
+                return _execute_sql_statements(
+                    query_id,
+                    rendered_query,
+                    start_time=start_time,
+                )
+            except Exception as ex:
+                logger.debug("Query %d: %s", query_id, ex)
+                stats_logger = app.config["STATS_LOGGER"]
+                stats_logger.incr("error_sqllab_unhandled")
+                query = _get_query(query_id=query_id)
+                return _handle_query_error(ex, query)
+
+
+def _make_check_stopped_fn(query: Query) -> Any:
+    """Create a function to check if query was stopped."""
+
+    def check_stopped() -> bool:
+        db.session.refresh(query)
+        return query.status == QueryStatus.STOPPED
+
+    return check_stopped
+
+
+def _make_execute_fn(query: Query, db_engine_spec: Any) -> Any:
+    """Create an execute function with stats timing."""
+
+    def execute_with_stats(cursor: Any, sql: str) -> None:
+        query.executed_sql = sql
+        stats_logger = app.config["STATS_LOGGER"]
+        with stats_timing("sqllab.query.time_executing_query", stats_logger):
+            db_engine_spec.execute_with_cursor(cursor, sql, query)
+
+    return execute_with_stats
+
+
+def _make_log_query_fn(database: Any) -> Any:
+    """Create a query logging function."""
+
+    def log_query(sql: str, schema: str | None) -> None:
+        if log_query_fn := app.config.get("QUERY_LOGGER"):
+            log_query_fn(
+                database.sqlalchemy_uri,
+                sql,
+                schema,
+                __name__,
+                security_manager,
+                None,
+            )
+
+    return log_query
+
+
+def _execute_sql_statements(
+    query_id: int,
+    rendered_query: str,
+    start_time: float | None,
+) -> dict[str, Any] | None:
+    """Execute SQL statements and store results."""
+    if start_time:
+        stats_logger = app.config["STATS_LOGGER"]
+        stats_logger.timing("sqllab.query.time_pending", now_as_float() - 
start_time)
+
+    query = _get_query(query_id=query_id)
+    payload: dict[str, Any] = {"query_id": query_id}
+    database = query.database
+    db_engine_spec = database.db_engine_spec
+    db_engine_spec.patch()
+
+    logger.info("Query %s: Set query to 'running'", str(query_id))
+    query.status = QueryStatus.RUNNING
+    query.start_running_time = now_as_float()
+    execution_start_time = now_as_float()
+    db.session.commit()  # pylint: disable=consider-using-transaction
+
+    parsed_script, blocks = _prepare_statement_blocks(rendered_query, 
db_engine_spec)
+
+    with database.get_raw_connection(
+        catalog=query.catalog,
+        schema=query.schema,
+    ) as conn:
+        cursor = conn.cursor()
+
+        cancel_query_id = db_engine_spec.get_cancel_query_id(cursor, query)
+        if cancel_query_id is not None:
+            query.set_extra_json_key(QUERY_CANCEL_KEY, cancel_query_id)
+            db.session.commit()  # pylint: disable=consider-using-transaction
+
+        try:
+            execution_results = execute_sql_with_cursor(
+                database=database,
+                cursor=cursor,
+                statements=blocks,
+                query=query,
+                log_query_fn=_make_log_query_fn(database),
+                check_stopped_fn=_make_check_stopped_fn(query),
+                execute_fn=_make_execute_fn(query, db_engine_spec),
+            )
+        except SoftTimeLimitExceeded as ex:
+            query.status = QueryStatus.TIMED_OUT
+            logger.warning("Query %d: Time limit exceeded", query.id)
+            timeout_sec = app.config["SQLLAB_ASYNC_TIME_LIMIT_SEC"]
+            raise SupersetErrorException(
+                SupersetError(
+                    message=__(
+                        "The query was killed after %(sqllab_timeout)s 
seconds. "
+                        "It might be too complex, or the database might be "
+                        "under heavy load.",
+                        sqllab_timeout=timeout_sec,
+                    ),
+                    error_type=SupersetErrorType.SQLLAB_TIMEOUT_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            ) from ex

Review Comment:
   **Suggestion:** Logic bug: when a SoftTimeLimitExceeded occurs the code sets 
the query status to TIMED_OUT but then raises an exception which will be caught 
by the outer task handler and ultimately mark the query as FAILED; this loses 
the TIMED_OUT state. Instead, commit the TIMED_OUT status, record the error, 
and return a payload immediately so the outer exception handler is not invoked 
and the correct TIMED_OUT state is preserved. [logic error]
   
   **Severity Level:** Minor ⚠️
   ```suggestion
               # Preserve TIMED_OUT state and persist it immediately instead of
               # re-raising and letting the outer handler mark the query as 
FAILED.
               query.status = QueryStatus.TIMED_OUT
               logger.warning("Query %d: Time limit exceeded", query.id)
               timeout_sec = app.config["SQLLAB_ASYNC_TIME_LIMIT_SEC"]
               msg = __(
                   "The query was killed after %(sqllab_timeout)s seconds. "
                   "It might be too complex, or the database might be "
                   "under heavy load.",
                   sqllab_timeout=timeout_sec,
               )
               query.error_message = msg
               if not query.end_time:
                   query.end_time = now_as_float()
               db.session.commit()  # pylint: disable=consider-using-transaction
               payload.update({"status": query.status, "error": msg, "errors": 
[]})
               return payload
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   The outer caller (execute_sql_task) catches any Exception and runs 
_handle_query_error which sets the query to FAILED. Re-raising here will indeed 
overwrite the TIMED_OUT state. Returning a payload after persisting TIMED_OUT 
preserves the correct state. This is a real logic fix, not just style.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/sql/execution/celery_task.py
   **Line:** 433:447
   **Comment:**
        *Logic Error: Logic bug: when a SoftTimeLimitExceeded occurs the code 
sets the query status to TIMED_OUT but then raises an exception which will be 
caught by the outer task handler and ultimately mark the query as FAILED; this 
loses the TIMED_OUT state. Instead, commit the TIMED_OUT status, record the 
error, and return a payload immediately so the outer exception handler is not 
invoked and the correct TIMED_OUT state is preserved.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/unit_tests/sql/execution/conftest.py:
##########
@@ -0,0 +1,315 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Shared fixtures and helpers for SQL execution tests.
+
+This module provides common mocks, fixtures, and helper functions used across
+test_celery_task.py and test_executor.py to reduce code duplication.
+"""
+
+from contextlib import contextmanager
+from typing import Any
+from unittest.mock import MagicMock
+
+import pandas as pd
+import pytest
+from flask import current_app
+from pytest_mock import MockerFixture
+
+from superset.common.db_query_status import QueryStatus as QueryStatusEnum
+from superset.models.core import Database
+
+# =============================================================================
+# Core Fixtures
+# =============================================================================
+
+
[email protected](autouse=True)
+def mock_db_session(mocker: MockerFixture) -> MagicMock:
+    """Mock database session for all tests to avoid foreign key constraints."""
+    mock_session = MagicMock()
+    mocker.patch("superset.sql.execution.executor.db.session", mock_session)
+    mocker.patch("superset.sql.execution.celery_task.db.session", mock_session)
+    return mock_session
+
+
[email protected]
+def mock_query() -> MagicMock:
+    """Create a mock Query model."""
+    query = MagicMock()
+    query.id = 123
+    query.database_id = 1
+    query.sql = "SELECT * FROM users"
+    query.status = QueryStatusEnum.PENDING
+    query.error_message = None
+    query.progress = 0
+    query.end_time = None
+    query.start_running_time = None
+    query.executed_sql = None
+    query.tmp_table_name = None
+    query.catalog = None
+    query.schema = "public"
+    query.extra = {}
+    query.set_extra_json_key = MagicMock()
+    query.results_key = None
+    query.select_as_cta = False
+    query.rows = 0
+    query.to_dict = MagicMock(return_value={"id": 123})
+    query.database = MagicMock()
+    query.database.db_engine_spec.extract_errors.return_value = []
+    query.database.unique_name = "test_db"
+    query.database.cache_timeout = 300
+    return query
+
+
[email protected]
+def mock_database() -> MagicMock:
+    """Create a mock Database."""
+    database = MagicMock()
+    database.id = 1
+    database.unique_name = "test_db"
+    database.cache_timeout = 300
+    database.sqlalchemy_uri = "postgresql://localhost/test"
+    database.db_engine_spec = MagicMock()
+    database.db_engine_spec.engine = "postgresql"
+    database.db_engine_spec.run_multiple_statements_as_one = False
+    database.db_engine_spec.allows_sql_comments = True
+    database.db_engine_spec.extract_errors = MagicMock(return_value=[])
+    database.db_engine_spec.execute_with_cursor = MagicMock()
+    database.db_engine_spec.get_cancel_query_id = MagicMock(return_value=None)
+    database.db_engine_spec.patch = MagicMock()
+    database.db_engine_spec.fetch_data = MagicMock(return_value=[])
+    return database
+
+
[email protected]
+def mock_result_set() -> MagicMock:
+    """Create a mock SupersetResultSet."""
+    result_set = MagicMock()
+    result_set.size = 2
+    result_set.columns = [{"name": "id"}, {"name": "name"}]
+    result_set.pa_table = MagicMock()
+    result_set.to_pandas_df = MagicMock(
+        return_value=pd.DataFrame({"id": [1, 2], "name": ["Alice", "Bob"]})
+    )
+    return result_set
+
+
[email protected]
+def database() -> Database:
+    """Create a real test database instance."""
+    return Database(
+        id=1,
+        database_name="test_db",
+        sqlalchemy_uri="sqlite://",
+        allow_dml=False,
+    )
+
+
[email protected]
+def database_with_dml() -> Database:
+    """Create a real test database instance with DML allowed."""
+    return Database(
+        id=2,
+        database_name="test_db_dml",
+        sqlalchemy_uri="sqlite://",
+        allow_dml=True,
+    )
+
+
+# =============================================================================
+# Helper Functions for Mock Creation
+# =============================================================================
+
+
+def create_mock_cursor(
+    column_names: list[str],
+    data: list[tuple[Any, ...]] | None = None,
+) -> MagicMock:
+    """
+    Create a mock database cursor with column description.
+
+    Args:
+        column_names: List of column names
+        data: Optional data to return from fetchall()
+
+    Returns:
+        Configured MagicMock cursor
+    """
+    mock_cursor = MagicMock()
+    mock_cursor.description = [(name,) for name in column_names]

Review Comment:
   **Suggestion:** `cursor.description` is set to single-element tuples 
[(name,)] but DB-API description entries are typically 7-item sequences; code 
that expects a full description tuple (e.g. unpacking fields) may fail—use 
7-element tuples with placeholders to better mimic real cursor descriptions. 
[possible bug]
   
   **Severity Level:** Critical 🚨
   ```suggestion
       # DB-API cursor.description is a sequence of 7-item tuples: (name, 
type_code, display_size, internal_size, precision, scale, null_ok)
       mock_cursor.description = [
           (name, None, None, None, None, None, None) for name in column_names
       ]
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   Good catch — DB-API `cursor.description` entries are 7-item sequences; using 
full 7-tuples better mimics real cursors and prevents failures in code that 
inspects other fields or unpacks the tuple. This improves test fidelity.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/unit_tests/sql/execution/conftest.py
   **Line:** 155:155
   **Comment:**
        *Possible Bug: `cursor.description` is set to single-element tuples 
[(name,)] but DB-API description entries are typically 7-item sequences; code 
that expects a full description tuple (e.g. unpacking fields) may fail—use 
7-element tuples with placeholders to better mimic real cursor descriptions.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
tests/unit_tests/sql/execution/test_executor.py:
##########
@@ -0,0 +1,2110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Tests for SQLExecutor.
+
+These tests cover the SQL execution API including:
+- Basic execution
+- DML handling
+- Jinja2 template rendering
+- CTAS/CVAS support
+- Security features (RLS, disallowed functions)
+- Result caching
+- Query model persistence
+- Async execution
+"""
+
+from typing import Any
+from unittest.mock import MagicMock
+
+import msgpack
+import pandas as pd
+import pytest
+from flask import current_app
+from pytest_mock import MockerFixture
+from superset_core.api.types import (
+    CacheOptions,
+    QueryOptions,
+    QueryStatus,
+)
+
+from superset.models.core import Database
+
+# Note: database, database_with_dml, mock_db_session fixtures and
+# mock_query_execution helper are imported from conftest.py
+from .conftest import mock_query_execution
+
+# =============================================================================
+# Basic Execution Tests
+# =============================================================================
+
+
+def test_execute_select_success(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test successful SELECT query execution."""
+    mock_query_execution(
+        mocker,
+        database,
+        return_data=[(1, "Alice"), (2, "Bob")],
+        column_names=["id", "name"],
+    )
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    result = database.execute("SELECT id, name FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+    assert len(result.statements) == 1
+    assert result.statements[0].data is not None
+    assert result.statements[0].row_count == 2
+    assert result.error_message is None
+
+
+def test_execute_with_options(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test query execution with custom options."""
+    mock_query_execution(mocker, database, return_data=[(100,)], 
column_names=["count"])
+    get_raw_conn_mock = mocker.patch.object(
+        database,
+        "get_raw_connection",
+        wraps=database.get_raw_connection,
+    )
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    options = QueryOptions(catalog="main", schema="public", limit=50)
+    result = database.execute("SELECT COUNT(*) FROM users", options=options)
+
+    assert result.status == QueryStatus.SUCCESS
+    get_raw_conn_mock.assert_called_once()
+    call_kwargs = get_raw_conn_mock.call_args[1]
+    assert call_kwargs["catalog"] == "main"
+    assert call_kwargs["schema"] == "public"
+
+
+def test_execute_records_execution_time(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that execution time is recorded."""
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    result = database.execute("SELECT id FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+    assert result.total_execution_time_ms is not None
+    assert result.total_execution_time_ms >= 0
+
+
+def test_execute_creates_query_record(
+    mocker: MockerFixture,
+    database: Database,
+    app_context: None,
+    mock_db_session: MagicMock,
+) -> None:
+    """Test that execute creates a Query record for audit."""
+    from superset.sql.execution.executor import SQLExecutor
+
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    # Mock _create_query_record to return a mock query with ID
+    mock_query = MagicMock()
+    mock_query.id = 123
+    mock_create_query = mocker.patch.object(
+        SQLExecutor, "_create_query_record", return_value=mock_query
+    )
+
+    result = database.execute("SELECT id FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+    assert result.query_id == 123
+    mock_create_query.assert_called_once()
+
+
+# =============================================================================
+# DML Handling Tests
+# =============================================================================
+
+
+def test_execute_dml_without_permission(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that DML queries fail when database.allow_dml is False."""
+    mocker.patch.dict(
+        current_app.config,
+        {"SQL_QUERY_MUTATOR": None, "SQLLAB_TIMEOUT": 30},
+    )
+
+    result = database.execute("INSERT INTO users (name) VALUES ('test')")
+
+    assert result.status == QueryStatus.FAILED
+    assert result.error_message is not None
+    assert "DML queries are not allowed" in result.error_message
+
+
+def test_execute_dml_with_permission(
+    mocker: MockerFixture, database_with_dml: Database, app_context: None
+) -> None:
+    """Test that DML queries succeed when database.allow_dml is True."""
+    mock_query_execution(mocker, database_with_dml, return_data=[], 
column_names=[])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    result = database_with_dml.execute("INSERT INTO users (name) VALUES 
('test')")
+
+    assert result.status == QueryStatus.SUCCESS
+
+
+def test_execute_update_without_permission(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that UPDATE queries fail when database.allow_dml is False."""
+    mocker.patch.dict(
+        current_app.config,
+        {"SQL_QUERY_MUTATOR": None, "SQLLAB_TIMEOUT": 30},
+    )
+
+    result = database.execute("UPDATE users SET name = 'test' WHERE id = 1")
+
+    assert result.status == QueryStatus.FAILED
+    assert result.error_message is not None
+    assert "DML queries are not allowed" in result.error_message
+
+
+def test_execute_delete_without_permission(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that DELETE queries fail when database.allow_dml is False."""
+    mocker.patch.dict(
+        current_app.config,
+        {"SQL_QUERY_MUTATOR": None, "SQLLAB_TIMEOUT": 30},
+    )
+
+    result = database.execute("DELETE FROM users WHERE id = 1")
+
+    assert result.status == QueryStatus.FAILED
+    assert result.error_message is not None
+    assert "DML queries are not allowed" in result.error_message
+
+
+# =============================================================================
+# Jinja2 Template Rendering Tests
+# =============================================================================
+
+
+def test_execute_with_template_params(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test query execution with Jinja2 template parameters."""
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    # Mock the template processor
+    mock_tp = MagicMock()
+    mock_tp.process_template.return_value = (
+        "SELECT * FROM events WHERE date > '2024-01-01'"
+    )
+    mocker.patch(
+        "superset.jinja_context.get_template_processor",
+        return_value=mock_tp,
+    )
+
+    options = QueryOptions(
+        template_params={"table": "events", "start_date": "2024-01-01"}
+    )
+    result = database.execute(
+        "SELECT * FROM {{ table }} WHERE date > '{{ start_date }}'",
+        options=options,
+    )
+
+    assert result.status == QueryStatus.SUCCESS
+    mock_tp.process_template.assert_called_once()
+
+
+def test_execute_without_template_params_no_rendering(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that template rendering is skipped when no params provided."""
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    mock_get_tp = mocker.patch("superset.jinja_context.get_template_processor")
+
+    result = database.execute("SELECT * FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+    mock_get_tp.assert_not_called()
+
+
+# =============================================================================
+# Disallowed Functions Tests
+# =============================================================================
+
+
+def test_execute_disallowed_functions(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that disallowed SQL functions are blocked."""
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "DISALLOWED_SQL_FUNCTIONS": {"sqlite": {"LOAD_EXTENSION"}},
+        },
+    )
+
+    result = database.execute("SELECT LOAD_EXTENSION('malicious.so')")
+
+    assert result.status == QueryStatus.FAILED
+    assert result.error_message is not None
+    assert "Disallowed SQL functions" in result.error_message
+
+
+def test_execute_allowed_functions(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that allowed SQL functions work normally."""
+    mock_query_execution(mocker, database, return_data=[(5,)], 
column_names=["count"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "DISALLOWED_SQL_FUNCTIONS": {"sqlite": {"LOAD_EXTENSION"}},
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    result = database.execute("SELECT COUNT(*) FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+
+
+# =============================================================================
+# Row-Level Security Tests
+# =============================================================================
+
+
+def test_execute_rls_applied(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that RLS is always applied."""
+    from superset.sql.execution.executor import SQLExecutor
+
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    # Mock _apply_rls_to_script to verify it's always called
+    mock_apply_rls = mocker.patch.object(SQLExecutor, "_apply_rls_to_script")
+
+    result = database.execute("SELECT * FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+    mock_apply_rls.assert_called()
+
+
+# =============================================================================
+# Result Caching Tests
+# =============================================================================
+
+
+def test_execute_returns_cached_result(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that cached results are returned when available."""
+    from superset.sql.execution.executor import SQLExecutor
+
+    cached_df = pd.DataFrame({"id": [1, 2]})
+
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    # Mock _get_from_cache to simulate a cache hit
+    cached_result = MagicMock()
+    cached_result.status = QueryStatus.SUCCESS
+    cached_result.data = cached_df
+    cached_result.is_cached = True
+    mocker.patch.object(SQLExecutor, "_get_from_cache", 
return_value=cached_result)
+
+    # get_raw_connection should NOT be called if cache hit
+    get_conn_mock = mocker.patch.object(database, "get_raw_connection")
+
+    result = database.execute("SELECT * FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+    assert result.is_cached is True
+    get_conn_mock.assert_not_called()
+
+
+def test_execute_force_cache_refresh(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that force_cache_refresh bypasses the cache."""
+    from superset.sql.execution.executor import SQLExecutor
+
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    # Mock _get_from_cache - should NOT be called when force_refresh=True
+    mock_get_cache = mocker.patch.object(SQLExecutor, "_get_from_cache")
+
+    options = QueryOptions(cache=CacheOptions(force_refresh=True))
+    result = database.execute("SELECT * FROM users", options=options)
+
+    assert result.status == QueryStatus.SUCCESS
+    assert result.is_cached is False
+    assert sum(s.row_count for s in result.statements) == 1  # Fresh result
+    mock_get_cache.assert_not_called()
+
+
+def test_execute_stores_in_cache(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that results are stored in cache."""
+    from superset.sql.execution.executor import SQLExecutor
+
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "CACHE_DEFAULT_TIMEOUT": 300,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    # Mock _get_from_cache to return None (cache miss)
+    mocker.patch.object(SQLExecutor, "_get_from_cache", return_value=None)
+    # Mock _store_in_cache to verify it gets called
+    mock_store_cache = mocker.patch.object(SQLExecutor, "_store_in_cache")
+
+    result = database.execute("SELECT * FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+    mock_store_cache.assert_called_once()
+
+
+# =============================================================================
+# Timeout Tests
+# =============================================================================
+
+
+def test_execute_timeout(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test query timeout handling."""
+    from superset.errors import ErrorLevel, SupersetErrorType
+    from superset.exceptions import SupersetTimeoutException
+
+    # Mock get_raw_connection to raise timeout
+    mock_conn = MagicMock()
+    mock_conn.__enter__ = MagicMock(
+        side_effect=SupersetTimeoutException(
+            error_type=SupersetErrorType.GENERIC_BACKEND_ERROR,
+            message="Query timed out",
+            level=ErrorLevel.ERROR,
+        )
+    )
+    mock_conn.__exit__ = MagicMock(return_value=False)
+    mocker.patch.object(database, "get_raw_connection", return_value=mock_conn)
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 1,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    result = database.execute("SELECT * FROM large_table")
+
+    assert result.status == QueryStatus.TIMED_OUT
+    assert result.error_message is not None
+
+
+def test_execute_custom_timeout(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test query with custom timeout option."""
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    mock_timeout = 
mocker.patch("superset.sql.execution.executor.utils.timeout")
+    mock_timeout.return_value.__enter__ = MagicMock()
+    mock_timeout.return_value.__exit__ = MagicMock(return_value=False)
+
+    options = QueryOptions(timeout_seconds=60)
+    result = database.execute("SELECT * FROM users", options=options)
+
+    assert result.status == QueryStatus.SUCCESS
+    mock_timeout.assert_called_with(
+        seconds=60,
+        error_message="Query exceeded the 60 seconds timeout.",
+    )
+
+
+# =============================================================================
+# Error Handling Tests
+# =============================================================================
+
+
+def test_execute_error(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test general error handling."""
+    # Mock get_raw_connection to raise an error
+    mock_conn = MagicMock()
+    mock_cursor = MagicMock()
+    mock_conn.cursor.return_value = mock_cursor
+    mock_conn.__enter__ = MagicMock(return_value=mock_conn)
+    mock_conn.__exit__ = MagicMock(return_value=False)
+    mocker.patch.object(database, "get_raw_connection", return_value=mock_conn)
+    mocker.patch.object(
+        database, "mutate_sql_based_on_config", side_effect=lambda sql, **kw: 
sql
+    )
+    mocker.patch.object(
+        database.db_engine_spec, "execute", side_effect=Exception("Database 
error")
+    )
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    result = database.execute("SELECT * FROM nonexistent")
+
+    assert result.status == QueryStatus.FAILED
+    assert result.error_message is not None
+    assert "Database error" in result.error_message
+
+
+# =============================================================================
+# Async Execution Tests
+# =============================================================================
+
+
+def test_execute_async_creates_query(
+    mocker: MockerFixture,
+    database: Database,
+    app_context: None,
+    mock_db_session: MagicMock,
+) -> None:
+    """Test that execute_async creates a Query record and submits to Celery."""
+    mocker.patch.dict(
+        current_app.config, {"SQL_QUERY_MUTATOR": None, "SQLLAB_TIMEOUT": 30}
+    )
+
+    # Mock db.session.add to set query.id (simulating database auto-increment)
+    def set_query_id(query: Any) -> None:
+        if not hasattr(query, "id") or query.id is None:
+            query.id = 123
+
+    mock_db_session.add.side_effect = set_query_id
+
+    mock_celery_task = mocker.patch(
+        "superset.sql.execution.celery_task.execute_sql_task"
+    )
+
+    result = database.execute_async("SELECT * FROM users")
+
+    assert result.status == QueryStatus.PENDING
+    assert result.query_id is not None
+    assert result.query_id == 123
+    mock_db_session.add.assert_called()
+    mock_celery_task.delay.assert_called()
+
+
+def test_execute_async_with_options(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test async execution with custom options."""
+    mocker.patch.dict(
+        current_app.config, {"SQL_QUERY_MUTATOR": None, "SQLLAB_TIMEOUT": 30}
+    )
+
+    mock_celery_task = mocker.patch(
+        "superset.sql.execution.celery_task.execute_sql_task"
+    )
+
+    options = QueryOptions(catalog="analytics", schema="reports")
+    result = database.execute_async("SELECT * FROM sales", options=options)
+
+    assert result.status == QueryStatus.PENDING
+    mock_celery_task.delay.assert_called_once()
+
+
+def test_execute_async_dml_without_permission_raises(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that async DML queries raise exception when not allowed."""
+    from superset.exceptions import SupersetSecurityException
+
+    mocker.patch.dict(
+        current_app.config, {"SQL_QUERY_MUTATOR": None, "SQLLAB_TIMEOUT": 30}
+    )
+
+    with pytest.raises(SupersetSecurityException, match="DML queries are not 
allowed"):
+        database.execute_async("INSERT INTO users (name) VALUES ('test')")
+
+
+def test_async_handle_get_status(
+    mocker: MockerFixture,
+    database: Database,
+    app_context: None,
+    mock_db_session: MagicMock,
+) -> None:
+    """Test that async handle can retrieve query status."""
+    from superset.models.sql_lab import Query
+
+    mock_query = MagicMock(spec=Query)
+    mock_query.status = "success"
+    filter_mock = mock_db_session.query.return_value.filter_by.return_value
+    filter_mock.one_or_none.return_value = mock_query
+
+    mocker.patch.dict(
+        current_app.config, {"SQL_QUERY_MUTATOR": None, "SQLLAB_TIMEOUT": 30}
+    )
+    mocker.patch("superset.sql.execution.celery_task.execute_sql_task")
+
+    result = database.execute_async("SELECT * FROM users")
+
+    status = result.get_status()
+    assert status == QueryStatus.SUCCESS
+
+
+def test_async_handle_cancel(
+    mocker: MockerFixture,
+    database: Database,
+    app_context: None,
+    mock_db_session: MagicMock,
+) -> None:
+    """Test that async handle can cancel a query."""
+    from superset.models.sql_lab import Query
+    from superset.sql.execution.executor import SQLExecutor
+
+    mock_query = MagicMock(spec=Query)
+    mock_query.status = "running"
+    mock_query.database = database
+    filter_mock = mock_db_session.query.return_value.filter_by.return_value
+    filter_mock.one_or_none.return_value = mock_query
+
+    mocker.patch.dict(
+        current_app.config, {"SQL_QUERY_MUTATOR": None, "SQLLAB_TIMEOUT": 30}
+    )
+    mocker.patch("superset.sql.execution.celery_task.execute_sql_task")
+    mock_cancel = mocker.patch.object(SQLExecutor, "_cancel_query", 
return_value=True)
+
+    result = database.execute_async("SELECT * FROM users")
+
+    cancelled = result.cancel()
+    assert cancelled is True
+    mock_cancel.assert_called_once()
+
+
+# =============================================================================
+# SQL Mutation Tests
+# =============================================================================
+
+
+def test_execute_applies_sql_mutator(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that SQL mutator is applied internally."""
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+
+    # Track what SQL gets mutated
+    mutate_mock = mocker.patch.object(
+        database, "mutate_sql_based_on_config", side_effect=lambda sql, **kw: 
sql
+    )
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": lambda sql, **kwargs: f"/* mutated */ {sql}",
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    result = database.execute("SELECT id FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+    # Verify mutate_sql_based_on_config was called
+    mutate_mock.assert_called()
+
+
+# =============================================================================
+# Progress Tracking Tests
+# =============================================================================
+
+
+def test_execute_multi_statement_updates_query_progress(
+    mocker: MockerFixture,
+    database: Database,
+    app_context: None,
+    mock_db_session: MagicMock,
+) -> None:
+    """Test that multi-statement execution updates Query.progress."""
+    from superset.models.sql_lab import Query as QueryModel
+    from superset.result_set import SupersetResultSet
+
+    # Mock raw connection for multi-statement execution
+    mock_cursor = MagicMock()
+    mock_cursor.description = [("id",), ("name",)]
+    mock_conn = MagicMock()
+    mock_conn.cursor.return_value = mock_cursor
+    mock_conn.__enter__ = MagicMock(return_value=mock_conn)
+    mock_conn.__exit__ = MagicMock(return_value=False)
+
+    mocker.patch.object(database, "get_raw_connection", return_value=mock_conn)
+    mocker.patch.object(
+        database,
+        "mutate_sql_based_on_config",
+        side_effect=lambda sql, **kw: sql,
+    )
+    mocker.patch.object(database.db_engine_spec, "execute")
+    mocker.patch.object(
+        database.db_engine_spec, "fetch_data", return_value=[("1", "Alice")]
+    )
+
+    mock_result_set = MagicMock(spec=SupersetResultSet)
+    mock_result_set.to_pandas_df.return_value = pd.DataFrame(
+        {"id": ["1"], "name": ["Alice"]}
+    )
+    mocker.patch("superset.result_set.SupersetResultSet", 
return_value=mock_result_set)
+
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    # Track progress updates on the Query model
+    mock_query = MagicMock(spec=QueryModel)
+    mock_query.id = 123
+    mocker.patch("superset.models.sql_lab.Query", return_value=mock_query)
+
+    # Execute multiple statements
+    result = database.execute("SELECT 1; SELECT 2;")
+
+    assert result.status == QueryStatus.SUCCESS
+    # Query.progress should have been updated
+    assert mock_query.progress == 100  # Final progress after completion
+    # set_extra_json_key should have been called for progress messages
+    assert mock_query.set_extra_json_key.call_count >= 2
+
+
+# =============================================================================
+# Query Logging Tests
+# =============================================================================
+
+
+def test_execute_calls_query_logger(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that QUERY_LOGGER is called when configured."""
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    log_calls: list[tuple[str, str, str | None]] = []
+
+    def mock_logger(db, sql, schema, module, security_manager, context) -> 
None:  # noqa: ARG001
+        log_calls.append((db.database_name, sql, schema))
+
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": mock_logger,
+        },
+    )
+
+    result = database.execute(
+        "SELECT * FROM users", options=QueryOptions(schema="public")
+    )
+
+    assert result.status == QueryStatus.SUCCESS
+    assert len(log_calls) == 1
+    assert log_calls[0][0] == "test_db"
+    # SQL may be formatted/prettified, so check for key parts
+    logged_sql = log_calls[0][1]
+    assert "SELECT" in logged_sql
+    assert "FROM users" in logged_sql
+    assert log_calls[0][2] == "public"
+
+
+def test_execute_no_query_logger_configured(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test that execution works when QUERY_LOGGER is not configured."""
+    mock_query_execution(mocker, database, return_data=[(1,)], 
column_names=["id"])
+    mocker.patch.dict(
+        current_app.config,
+        {
+            "SQL_QUERY_MUTATOR": None,
+            "SQLLAB_TIMEOUT": 30,
+            "SQL_MAX_ROW": None,
+            "QUERY_LOGGER": None,
+        },
+    )
+
+    # Should not raise any errors
+    result = database.execute("SELECT * FROM users")
+
+    assert result.status == QueryStatus.SUCCESS
+
+
+# =============================================================================
+# Dry Run Tests
+# =============================================================================
+
+
+def test_execute_dry_run_returns_transformed_sql(
+    mocker: MockerFixture, database: Database, app_context: None
+) -> None:
+    """Test dry run returns transformed SQL without execution."""
+    from superset.sql.execution.executor import SQLExecutor

Review Comment:
   **Suggestion:** The inline `mock_logger` in the test declares a fixed 
parameter list. If the production `QUERY_LOGGER` is invoked with a different 
signature (additional/removed parameters), the test will raise a TypeError; 
accept arbitrary args/kwargs in the mock and extract only the needed values to 
avoid signature mismatches. [possible bug]
   
   **Severity Level:** Critical 🚨
   ```suggestion
       def mock_logger(*args, **kwargs) -> None:  # noqa: ARG001
           db = args[0] if len(args) > 0 else kwargs.get("db")
           sql = args[1] if len(args) > 1 else kwargs.get("sql")
           schema = args[2] if len(args) > 2 else kwargs.get("schema")
           log_calls.append((getattr(db, "database_name", None), sql, schema))
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   The suggestion is valid: making the test logger accept *args/**kwargs makes 
the test less brittle and avoids a surprising TypeError if the production 
QUERY_LOGGER is invoked with a different signature. The proposed improved code 
safely extracts the needed values and uses getattr for database_name so it 
won't crash if db is missing or has a different shape. This is a small, 
sensible hardening of the test.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** tests/unit_tests/sql/execution/test_executor.py
   **Line:** 880:881
   **Comment:**
        *Possible Bug: The inline `mock_logger` in the test declares a fixed 
parameter list. If the production `QUERY_LOGGER` is invoked with a different 
signature (additional/removed parameters), the test will raise a TypeError; 
accept arbitrary args/kwargs in the mock and extract only the needed values to 
avoid signature mismatches.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



##########
superset/sql/execution/executor.py:
##########
@@ -0,0 +1,1080 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+SQL Executor implementation for Database.execute() and execute_async().
+
+This module provides the SQLExecutor class that implements the query execution
+methods defined in superset_core.api.models.Database.
+
+Implementation Features
+-----------------------
+
+Query Preparation (applies to both sync and async):
+- Jinja2 template rendering (via template_params in QueryOptions)
+- SQL mutation via SQL_QUERY_MUTATOR config hook
+- DML permission checking (requires database.allow_dml=True for DML)
+- Disallowed functions checking via DISALLOWED_SQL_FUNCTIONS config
+- Row-level security (RLS) via AST transformation (always applied)
+- Result limit application via SQL_MAX_ROW config
+- Catalog/schema resolution and validation
+
+Synchronous Execution (execute):
+- Multi-statement SQL parsing and execution
+- Progress tracking via Query model
+- Result caching via cache_manager.data_cache
+- Query logging via QUERY_LOGGER config hook
+- Timeout protection via SQLLAB_TIMEOUT config
+- Dry run mode (returns transformed SQL without execution)
+
+Asynchronous Execution (execute_async):
+- Celery task submission for background execution
+- Security validation before submission
+- Query model creation with PENDING status
+- Result caching check (returns cached if available)
+- Background execution with timeout via SQLLAB_ASYNC_TIME_LIMIT_SEC
+- Results stored in results backend for retrieval
+- Handle-based progress tracking and cancellation
+
+See Database.execute() and Database.execute_async() docstrings in
+superset_core.api.models for the public API contract.
+"""
+
+from __future__ import annotations
+
+import logging
+import time
+from datetime import datetime
+from typing import Any, TYPE_CHECKING
+
+from flask import current_app as app, g, has_app_context
+
+from superset import db
+from superset.errors import ErrorLevel, SupersetError, SupersetErrorType
+from superset.exceptions import (
+    SupersetSecurityException,
+    SupersetTimeoutException,
+)
+from superset.extensions import cache_manager
+from superset.sql.parse import SQLScript
+from superset.utils import core as utils
+
+if TYPE_CHECKING:
+    from superset_core.api.types import (
+        AsyncQueryHandle,
+        QueryOptions,
+        QueryResult,
+    )
+
+    from superset.models.core import Database
+    from superset.result_set import SupersetResultSet
+
+logger = logging.getLogger(__name__)
+
+
+def execute_sql_with_cursor(
+    database: Database,
+    cursor: Any,
+    statements: list[str],
+    query: Any,
+    log_query_fn: Any | None = None,
+    check_stopped_fn: Any | None = None,
+    execute_fn: Any | None = None,
+) -> list[tuple[str, SupersetResultSet | None, float, int]]:
+    """
+    Execute SQL statements with a cursor and return all result sets.
+
+    This is the shared execution logic used by both sync (SQLExecutor) and
+    async (celery_task) execution paths. It handles multi-statement execution
+    with progress tracking via the Query model.
+
+    :param database: Database model to execute against
+    :param cursor: Database cursor to use for execution
+    :param statements: List of SQL statements to execute
+    :param query: Query model for progress tracking
+    :param log_query_fn: Optional function to log queries, called as fn(sql, 
schema)
+    :param check_stopped_fn: Optional function to check if query was stopped.
+        Should return True if stopped. Used by async execution for 
cancellation.
+    :param execute_fn: Optional custom execute function. If not provided, uses
+        database.db_engine_spec.execute(cursor, sql, database). Custom function
+        should accept (cursor, sql) and handle execution.
+    :returns: List of (statement_sql, result_set, execution_time_ms, rowcount) 
tuples
+        Returns empty list if stopped. Raises exception on error (fail-fast).
+    """
+    from superset.result_set import SupersetResultSet
+
+    total = len(statements)
+    if total == 0:
+        return []
+
+    results: list[tuple[str, SupersetResultSet | None, float, int]] = []
+
+    for i, statement in enumerate(statements):
+        # Check if query was stopped (async cancellation)
+        if check_stopped_fn and check_stopped_fn():
+            return results
+
+        stmt_start_time = time.time()
+
+        # Apply SQL mutation
+        stmt_sql = database.mutate_sql_based_on_config(
+            statement,
+            is_split=True,
+        )
+
+        # Log query
+        if log_query_fn:
+            log_query_fn(stmt_sql, query.schema)
+
+        # Execute - use custom function or default
+        if execute_fn:
+            execute_fn(cursor, stmt_sql)
+        else:
+            database.db_engine_spec.execute(cursor, stmt_sql, database)
+
+        stmt_execution_time = (time.time() - stmt_start_time) * 1000
+
+        # Fetch results from ALL statements
+        description = cursor.description
+        if description:
+            rows = database.db_engine_spec.fetch_data(cursor)
+            result_set = SupersetResultSet(
+                rows,
+                description,
+                database.db_engine_spec,
+            )
+        else:
+            # DML statement - no result set
+            result_set = None
+
+        # Get row count for DML statements
+        rowcount = cursor.rowcount if hasattr(cursor, "rowcount") else 0
+
+        results.append((stmt_sql, result_set, stmt_execution_time, rowcount))
+
+        # Update progress on Query model
+        progress_pct = int(((i + 1) / total) * 100)
+        query.progress = progress_pct
+        query.set_extra_json_key(
+            "progress",
+            f"Running statement {i + 1} of {total}",
+        )
+        db.session.commit()  # pylint: disable=consider-using-transaction
+
+    return results
+
+
+class SQLExecutor:
+    """
+    SQL query executor implementation.
+
+    Implements Database.execute() and execute_async() methods.
+    See superset_core.api.models.Database for the full public API 
documentation.
+    """
+
+    def __init__(self, database: Database) -> None:
+        """
+        Initialize the executor with a database.
+
+        :param database: Database model instance to execute queries against
+        """
+        self.database = database
+
+    def execute(
+        self,
+        sql: str,
+        options: QueryOptions | None = None,
+    ) -> QueryResult:
+        """
+        Execute SQL synchronously.
+
+        If options.dry_run=True, returns the transformed SQL without execution.
+        All transformations (RLS, templates, limits) are still applied.
+
+        See superset_core.api.models.Database.execute() for full documentation.
+        """
+        from superset_core.api.types import (
+            QueryOptions as QueryOptionsType,
+            QueryResult as QueryResultType,
+            QueryStatus,
+            StatementResult,
+        )
+
+        opts: QueryOptionsType = options or QueryOptionsType()
+        start_time = time.time()
+
+        try:
+            # 1. Prepare SQL (assembly only, no security checks)
+            script, catalog, schema = self._prepare_sql(sql, opts)
+
+            # 2. Security checks
+            self._check_security(script)
+
+            # 3. Get mutation status and format SQL
+            has_mutation = script.has_mutation()
+            final_sql = script.format()
+
+            # DRY RUN: Return transformed SQL without execution
+            if opts.dry_run:
+                total_execution_time_ms = (time.time() - start_time) * 1000
+                # Create a StatementResult for each statement in dry-run mode
+                dry_run_statements = [
+                    StatementResult(
+                        statement=stmt.format(),
+                        data=None,
+                        row_count=0,
+                        execution_time_ms=0,
+                    )
+                    for stmt in script.statements
+                ]
+                return QueryResultType(
+                    status=QueryStatus.SUCCESS,
+                    statements=dry_run_statements,
+                    query_id=None,
+                    total_execution_time_ms=total_execution_time_ms,
+                    is_cached=False,
+                )
+
+            # 4. Check cache
+            cached_result = self._try_get_cached_result(has_mutation, 
final_sql, opts)
+            if cached_result:
+                return cached_result
+
+            # 5. Create Query model for audit
+            query = self._create_query_record(
+                final_sql, opts, catalog, schema, status="running"
+            )
+
+            # 6. Execute with timeout
+            timeout = opts.timeout_seconds or app.config.get("SQLLAB_TIMEOUT", 
30)
+            timeout_msg = f"Query exceeded the {timeout} seconds timeout."
+
+            with utils.timeout(seconds=timeout, error_message=timeout_msg):
+                statement_results = self._execute_statements(
+                    final_sql,
+                    catalog,
+                    schema,
+                    query,
+                )
+
+            total_execution_time_ms = (time.time() - start_time) * 1000
+
+            # Calculate total row count for Query model
+            total_rows = sum(stmt.row_count for stmt in statement_results)
+
+            # Update query record
+            query.status = "success"
+            query.rows = total_rows
+            query.progress = 100
+            db.session.commit()  # pylint: disable=consider-using-transaction
+
+            result = QueryResultType(
+                status=QueryStatus.SUCCESS,
+                statements=statement_results,
+                query_id=query.id,
+                total_execution_time_ms=total_execution_time_ms,
+            )
+
+            # Store in cache (if SELECT and caching enabled)
+            if not has_mutation:
+                self._store_in_cache(result, final_sql, opts)
+
+            return result
+
+        except SupersetTimeoutException:
+            return self._create_error_result(
+                QueryStatus.TIMED_OUT,
+                "Query exceeded the timeout limit",
+                sql,
+                start_time,
+            )
+        except SupersetSecurityException as ex:
+            return self._create_error_result(
+                QueryStatus.FAILED, str(ex), sql, start_time
+            )
+        except Exception as ex:
+            error_msg = self.database.db_engine_spec.extract_error_message(ex)
+            return self._create_error_result(
+                QueryStatus.FAILED, error_msg, sql, start_time
+            )
+
+    def execute_async(
+        self,
+        sql: str,
+        options: QueryOptions | None = None,
+    ) -> AsyncQueryHandle:
+        """
+        Execute SQL asynchronously via Celery.
+
+        If options.dry_run=True, returns the transformed SQL as a completed
+        AsyncQueryHandle without submitting to Celery.
+
+        See superset_core.api.models.Database.execute_async() for full 
documentation.
+        """
+        from superset_core.api.types import (
+            QueryOptions as QueryOptionsType,
+            QueryResult as QueryResultType,
+            QueryStatus,
+        )
+
+        opts: QueryOptionsType = options or QueryOptionsType()
+
+        # 1. Prepare SQL (assembly only, no security checks)
+        script, catalog, schema = self._prepare_sql(sql, opts)
+
+        # 2. Security checks
+        self._check_security(script)
+
+        # 3. Get mutation status and format SQL
+        has_mutation = script.has_mutation()
+        final_sql = script.format()
+
+        # DRY RUN: Return transformed SQL as completed async handle
+        if opts.dry_run:
+            from superset_core.api.types import StatementResult
+
+            dry_run_statements = [
+                StatementResult(
+                    statement=stmt.format(),
+                    data=None,
+                    row_count=0,
+                    execution_time_ms=0,
+                )
+                for stmt in script.statements
+            ]
+            dry_run_result = QueryResultType(
+                status=QueryStatus.SUCCESS,
+                statements=dry_run_statements,
+                query_id=None,
+                total_execution_time_ms=0,
+                is_cached=False,
+            )
+            return self._create_cached_handle(dry_run_result)
+
+        # 4. Check cache
+        if cached_result := self._try_get_cached_result(has_mutation, 
final_sql, opts):
+            return self._create_cached_handle(cached_result)
+
+        # 5. Create Query model for audit
+        query = self._create_query_record(
+            final_sql, opts, catalog, schema, status="pending"
+        )
+
+        # 6. Submit to Celery
+        self._submit_query_to_celery(query, final_sql, opts)
+
+        # 7. Create and return handle with bound methods
+        return self._create_async_handle(query.id)
+
+    def _prepare_sql(
+        self,
+        sql: str,
+        opts: QueryOptions,
+    ) -> tuple[SQLScript, str | None, str | None]:
+        """
+        Prepare SQL for execution (no side effects, no security checks).
+
+        This method performs SQL preprocessing:
+        1. Template rendering
+        2. SQL parsing
+        3. Catalog/schema resolution
+        4. RLS application
+        5. Limit application (if not mutation)
+
+        Security checks (disallowed functions, DML permission) are performed
+        by the caller after receiving the prepared script.
+
+        :param sql: Original SQL query
+        :param opts: Query options
+        :returns: Tuple of (prepared SQLScript, catalog, schema)
+        """
+        # 1. Render Jinja2 templates
+        rendered_sql = self._render_sql_template(sql, opts.template_params)
+
+        # 2. Parse SQL with SQLScript
+        script = SQLScript(rendered_sql, self.database.db_engine_spec.engine)
+
+        # 3. Get catalog and schema
+        catalog = opts.catalog or self.database.get_default_catalog()
+        schema = opts.schema or self.database.get_default_schema(catalog)
+
+        # 4. Apply RLS directly to script statements
+        self._apply_rls_to_script(script, catalog, schema)
+
+        # 5. Apply limit only if not a mutation
+        if not script.has_mutation():
+            self._apply_limit_to_script(script, opts)
+
+        return script, catalog, schema
+
+    def _check_security(self, script: SQLScript) -> None:
+        """
+        Perform security checks on prepared SQL script.
+
+        :param script: Prepared SQLScript
+        :raises SupersetSecurityException: If security checks fail
+        """
+        # Check disallowed functions
+        if disallowed := self._check_disallowed_functions(script):
+            raise SupersetSecurityException(
+                SupersetError(
+                    message=f"Disallowed SQL functions: {', 
'.join(disallowed)}",
+                    error_type=SupersetErrorType.INVALID_SQL_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            )
+
+        # Check DML permission
+        if script.has_mutation() and not self.database.allow_dml:
+            raise SupersetSecurityException(
+                SupersetError(
+                    message="DML queries are not allowed on this database",
+                    error_type=SupersetErrorType.DML_NOT_ALLOWED_ERROR,
+                    level=ErrorLevel.ERROR,
+                )
+            )
+
+    def _execute_statements(
+        self,
+        sql: str,
+        catalog: str | None,
+        schema: str | None,
+        query: Any,
+    ) -> list[Any]:
+        """
+        Execute SQL statements and return per-statement results.
+
+        Progress is tracked via Query.progress field.
+        Uses the same execution path for both single and multi-statement 
queries.
+
+        :param sql: Final SQL to execute (with RLS and all transformations 
applied)
+        :param catalog: Catalog name
+        :param schema: Schema name
+        :param query: Query model for progress tracking
+        :returns: List of StatementResult objects
+        """
+        from superset_core.api.types import StatementResult
+
+        # Parse the final SQL (with RLS applied) to get statements
+        script = SQLScript(sql, self.database.db_engine_spec.engine)
+        statements = script.statements
+
+        # Handle empty script
+        if not statements:
+            return []
+
+        results_list = []
+
+        # Use consistent execution path for all queries
+        with self.database.get_raw_connection(catalog=catalog, schema=schema) 
as conn:
+            cursor = conn.cursor()
+
+            execution_results = execute_sql_with_cursor(
+                database=self.database,
+                cursor=cursor,
+                statements=[stmt.format() for stmt in statements],
+                query=query,
+                log_query_fn=self._log_query,
+            )
+
+            # Build StatementResult for each executed statement
+            for stmt_sql, result_set, exec_time, rowcount in execution_results:
+                if result_set is not None:
+                    # SELECT statement
+                    df = result_set.to_pandas_df()
+                    stmt_result = StatementResult(
+                        statement=stmt_sql,
+                        data=df,
+                        row_count=len(df),
+                        execution_time_ms=exec_time,
+                    )
+                else:
+                    # DML statement - no data, just row count
+                    stmt_result = StatementResult(
+                        statement=stmt_sql,
+                        data=None,
+                        row_count=rowcount,
+                        execution_time_ms=exec_time,
+                    )
+
+                results_list.append(stmt_result)
+
+        return results_list
+
+    def _log_query(
+        self,
+        sql: str,
+        schema: str | None,
+    ) -> None:
+        """
+        Log query using QUERY_LOGGER config.
+
+        :param sql: SQL to log
+        :param schema: Schema name
+        """
+        from superset import security_manager
+
+        if log_query := app.config.get("QUERY_LOGGER"):
+            log_query(
+                self.database,
+                sql,
+                schema,
+                __name__,
+                security_manager,
+                {},
+            )
+
+    def _create_error_result(
+        self,
+        status: Any,
+        error_message: str,
+        sql: str,
+        start_time: float,
+        partial_results: list[Any] | None = None,
+    ) -> QueryResult:
+        """
+        Create a QueryResult for error cases.
+
+        :param status: QueryStatus enum value
+        :param error_message: Error message to include
+        :param sql: SQL query (original if error occurred before 
transformation)
+        :param start_time: Start time for calculating execution duration
+        :param partial_results: Optional list of StatementResult from 
successful
+            statements before the failure
+        :returns: QueryResult with error status
+        """
+        from superset_core.api.types import QueryResult as QueryResultType
+
+        return QueryResultType(
+            status=status,
+            statements=partial_results or [],
+            error_message=error_message,
+            total_execution_time_ms=(time.time() - start_time) * 1000,
+        )
+
+    def _render_sql_template(
+        self, sql: str, template_params: dict[str, Any] | None
+    ) -> str:
+        """
+        Render Jinja2 template with params.
+
+        :param sql: SQL string potentially containing Jinja2 templates
+        :param template_params: Parameters to pass to the template
+        :returns: Rendered SQL string
+        """
+        if template_params is None:
+            return sql
+
+        from superset.jinja_context import get_template_processor
+
+        tp = get_template_processor(database=self.database)
+        return tp.process_template(sql, **template_params)
+
+    def _apply_limit_to_script(self, script: SQLScript, opts: QueryOptions) -> 
None:
+        """
+        Apply limit to the last statement in the script in place.
+
+        :param script: SQLScript object to modify
+        :param opts: Query options
+        """
+        # Skip if no limit requested
+        if opts.limit is None:
+            return
+
+        sql_max_row = app.config.get("SQL_MAX_ROW")
+        effective_limit = opts.limit
+        if sql_max_row and opts.limit > sql_max_row:
+            effective_limit = sql_max_row
+
+        # Apply limit to last statement only
+        if script.statements:
+            script.statements[-1].set_limit_value(
+                effective_limit,
+                self.database.db_engine_spec.limit_method,
+            )
+
+    def _try_get_cached_result(
+        self,
+        has_mutation: bool,
+        sql: str,
+        opts: QueryOptions,
+    ) -> QueryResult | None:
+        """
+        Try to get a cached result if conditions allow.
+
+        :param has_mutation: Whether the query contains mutations (DML)
+        :param sql: SQL query
+        :param opts: Query options
+        :returns: Cached QueryResult or None
+        """
+        if has_mutation or (opts.cache and opts.cache.force_refresh):
+            return None
+
+        return self._get_from_cache(sql, opts)
+
+    def _check_disallowed_functions(self, script: SQLScript) -> set[str] | 
None:
+        """
+        Check for disallowed SQL functions.
+
+        :param script: Parsed SQL script
+        :returns: Set of disallowed functions found, or None if none found
+        """
+        disallowed_config = app.config.get("DISALLOWED_SQL_FUNCTIONS", {})
+        engine_name = self.database.db_engine_spec.engine
+
+        # Get disallowed functions for this engine
+        engine_disallowed = disallowed_config.get(engine_name, set())
+        if not engine_disallowed:
+            return None
+
+        # Check each statement for disallowed functions
+        found = set()
+        for statement in script.statements:
+            # Use the statement's AST to check for function calls
+            statement_str = str(statement).upper()
+            for func in engine_disallowed:
+                if func.upper() in statement_str:
+                    found.add(func)
+
+        return found if found else None
+
+    def _apply_rls_to_script(
+        self, script: SQLScript, catalog: str | None, schema: str | None
+    ) -> None:
+        """
+        Apply Row-Level Security to SQLScript statements in place.
+
+        :param script: SQLScript object to modify
+        :param catalog: Catalog name
+        :param schema: Schema name
+        """
+        from superset.utils.rls import apply_rls
+
+        # Apply RLS to each statement in the script
+        for statement in script.statements:
+            apply_rls(self.database, catalog, schema or "", statement)
+
+    def _create_query_record(
+        self,
+        sql: str,
+        opts: QueryOptions,
+        catalog: str | None,
+        schema: str | None,
+        status: str = "running",
+    ) -> Any:
+        """
+        Create Query model for audit/tracking.
+
+        :param sql: SQL to execute
+        :param opts: Query options
+        :param catalog: Catalog name
+        :param schema: Schema name
+        :param status: Initial query status ("running" for sync, "pending" for 
async)
+        :returns: Query model instance
+        """
+        import uuid
+
+        from superset.models.sql_lab import Query as QueryModel
+
+        user_id = None
+        if has_app_context() and hasattr(g, "user") and g.user:
+            user_id = g.user.get_id()
+
+        # Generate client_id for Query model
+        client_id = uuid.uuid4().hex[:11]
+
+        query = QueryModel(
+            client_id=client_id,
+            database_id=self.database.id,
+            sql=sql,
+            catalog=catalog,
+            schema=schema,
+            user_id=user_id,
+            status=status,
+            limit=opts.limit,
+        )
+        db.session.add(query)
+        db.session.commit()  # pylint: disable=consider-using-transaction
+
+        return query
+
+    def _get_from_cache(self, sql: str, opts: QueryOptions) -> QueryResult | 
None:
+        """
+        Check results cache for existing result.
+
+        :param sql: SQL query
+        :param opts: Query options
+        :returns: Cached QueryResult if found, None otherwise
+        """
+        from superset_core.api.types import (
+            QueryResult as QueryResultType,
+            QueryStatus,
+            StatementResult,
+        )
+
+        cache_key = self._generate_cache_key(sql, opts)
+
+        if (cached := cache_manager.data_cache.get(cache_key)) is not None:
+            # Reconstruct statement results from cached data
+            statements = [
+                StatementResult(
+                    statement=stmt_data["statement"],
+                    data=stmt_data["data"],
+                    row_count=stmt_data["row_count"],
+                    execution_time_ms=stmt_data["execution_time_ms"],
+                )
+                for stmt_data in cached.get("statements", [])
+            ]
+            return QueryResultType(
+                status=QueryStatus.SUCCESS,
+                statements=statements,
+                is_cached=True,

Review Comment:
   **Suggestion:** Cached QueryResult returned from `_get_from_cache` does not 
include `query_id`, while other places populate `query_id` in QueryResult 
objects; include `query_id=None` to keep the returned object shape consistent 
and avoid KeyError or contract mismatch downstream. [possible bug]
   
   **Severity Level:** Critical 🚨
   ```suggestion
                   query_id=None,
   ```
   <details>
   <summary><b>Why it matters? ⭐ </b></summary>
   
   Reasonable consistency improvement — other code paths often include 
query_id, and explicitly setting query_id=None makes the returned object's 
shape consistent and avoids surprises in consumers that expect the attribute to 
exist. It's a harmless, backward-compatible change.
   </details>
   <details>
   <summary><b>Prompt for AI Agent 🤖 </b></summary>
   
   ```mdx
   This is a comment left during a code review.
   
   **Path:** superset/sql/execution/executor.py
   **Line:** 745:745
   **Comment:**
        *Possible Bug: Cached QueryResult returned from `_get_from_cache` does 
not include `query_id`, while other places populate `query_id` in QueryResult 
objects; include `query_id=None` to keep the returned object shape consistent 
and avoid KeyError or contract mismatch downstream.
   
   Validate the correctness of the flagged issue. If correct, How can I resolve 
this? If you propose a fix, implement it and please make it concise.
   ```
   </details>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to