villebro commented on code in PR #36368:
URL: https://github.com/apache/superset/pull/36368#discussion_r2738512533


##########
superset/models/tasks.py:
##########
@@ -0,0 +1,368 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Task model for Global Task Framework (GTF)"""
+
+from __future__ import annotations
+
+import uuid
+from datetime import datetime, timezone
+from typing import Any, cast
+
+from flask_appbuilder import Model
+from sqlalchemy import (
+    Column,
+    DateTime,
+    Integer,
+    String,
+    Text,
+)
+from sqlalchemy.orm import relationship
+from superset_core.api.models import Task as CoreTask
+from superset_core.api.tasks import TaskProperties, TaskStatus
+
+from superset.models.helpers import AuditMixinNullable
+from superset.models.task_subscribers import TaskSubscriber
+from superset.tasks.utils import (
+    error_update,
+    get_finished_dedup_key,
+    parse_properties,
+    serialize_properties,
+)
+from superset.utils import json
+
+
+class Task(CoreTask, AuditMixinNullable, Model):
+    """
+    Concrete Task model for the Global Task Framework (GTF).
+
+    This model represents async tasks in Superset, providing unified tracking
+    for all background operations including SQL queries, thumbnail generation,
+    reports, and other async operations.
+
+    Non-filterable fields (progress, error info, execution config) are stored
+    in a `properties` JSON blob for schema flexibility.
+    """
+
+    __tablename__ = "tasks"
+
+    # Primary key and identifiers
+    id = Column(Integer, primary_key=True)
+    uuid = Column(
+        String(36), nullable=False, unique=True, default=lambda: 
str(uuid.uuid4())
+    )
+
+    # Task metadata (filterable)
+    task_key = Column(String(256), nullable=False, index=True)  # For 
deduplication
+    task_type = Column(String(100), nullable=False, index=True)  # e.g., 
'sql_execution'
+    task_name = Column(String(256), nullable=True)  # Human readable name
+    scope = Column(
+        String(20), nullable=False, index=True, default="private"
+    )  # private/shared/system
+    status = Column(
+        String(50), nullable=False, index=True, 
default=TaskStatus.PENDING.value
+    )
+    dedup_key = Column(
+        String(512), nullable=False, unique=True, index=True
+    )  # Computed deduplication key
+
+    # Timestamps
+    started_at = Column(DateTime, nullable=True)
+    ended_at = Column(DateTime, nullable=True)
+
+    # User context for execution
+    user_id = Column(Integer, nullable=True)
+
+    # Task-specific output data (set by task code via 
ctx.update_task(payload=...))
+    payload = Column(Text, nullable=True, default="{}")
+
+    # Properties JSON blob - contains runtime state and execution config:
+    # - is_abortable: bool - has abort handler registered
+    # - progress_percent: float - progress 0.0-1.0
+    # - progress_current: int - current iteration count
+    # - progress_total: int - total iterations
+    # - error_message: str - human-readable error message
+    # - exception_type: str - exception class name
+    # - stack_trace: str - full formatted traceback
+    # - timeout: int - timeout in seconds
+    _properties = Column("properties", Text, nullable=True, default="{}")
+
+    # Transient cache for parsed properties (not persisted)
+    _properties_cache: TaskProperties | None = None
+
+    # Relationships
+    subscribers = relationship(
+        TaskSubscriber,
+        back_populates="task",
+        cascade="all, delete-orphan",
+    )
+
+    def __repr__(self) -> str:
+        return f"<Task {self.task_type}:{self.task_key} [{self.status}]>"
+
+    # -------------------------------------------------------------------------
+    # Properties accessor
+    # -------------------------------------------------------------------------
+
+    @property
+    def properties(self) -> TaskProperties:
+        """
+        Get typed properties (cached for performance).
+
+        Properties contain runtime state and execution config that doesn't
+        need database filtering. Parsed once and cached until next write.
+
+        Always use .get() for reads since keys may be absent.
+
+        :returns: TaskProperties dict (sparse - only contains keys that were 
set)
+        """
+        if self._properties_cache is None:
+            self._properties_cache = parse_properties(self._properties)
+        return self._properties_cache
+
+    def update_properties(self, updates: TaskProperties) -> None:
+        """
+        Update specific properties fields (merge semantics).
+
+        Only updates fields present in the updates dict.
+
+        :param updates: TaskProperties dict with fields to update
+
+        Example:
+            task.update_properties({"is_abortable": True})
+            task.update_properties(progress_update((50, 100)))
+        """
+        current = cast(TaskProperties, dict(self.properties))
+        current.update(updates)  # Merge updates
+        self._properties = serialize_properties(current)
+        self._properties_cache = current  # Update cache
+
+    # -------------------------------------------------------------------------
+    # Payload accessor (for task-specific output data)
+    # -------------------------------------------------------------------------
+
+    def get_payload(self) -> dict[str, Any]:
+        """
+        Get payload as parsed JSON.
+
+        Payload contains task-specific output data set by task code via
+        ctx.update_task(payload=...).
+
+        :returns: Dictionary containing payload data
+        """
+        try:
+            return json.loads(self.payload or "{}")
+        except (json.JSONDecodeError, TypeError):
+            return {}
+
+    def set_payload(self, data: dict[str, Any]) -> None:
+        """
+        Update payload with new data.
+
+        The payload is merged with existing data, not replaced.
+
+        :param data: Dictionary of data to merge into payload
+        """
+        current = self.get_payload()
+        current.update(data)
+        self.payload = json.dumps(current)
+
+    # -------------------------------------------------------------------------
+    # Error handling
+    # -------------------------------------------------------------------------
+
+    def set_error_from_exception(self, exception: BaseException) -> None:
+        """
+        Set error fields from an exception.
+
+        Captures the error message, exception type, and full stack trace.
+        Called automatically by the executor when a task raises an exception.
+
+        :param exception: The exception that caused the failure
+        """
+        self.update_properties(error_update(exception))
+
+    # -------------------------------------------------------------------------
+    # Status management
+    # -------------------------------------------------------------------------
+
+    def set_status(self, status: TaskStatus | str) -> None:
+        """
+        Update task status and dedup_key.
+
+        When a task finishes (success, failure, or abort), the dedup_key is
+        changed to the task's UUID. This frees up the slot so new tasks with
+        the same parameters can be created.
+
+        :param status: New task status
+        """
+        if isinstance(status, TaskStatus):
+            status = status.value
+        self.status = status
+
+        # Update timestamps and is_abortable based on status
+        now = datetime.now(timezone.utc)
+        if status == TaskStatus.IN_PROGRESS.value and not self.started_at:
+            self.started_at = now
+            # Set is_abortable to False when task starts executing
+            # (will be set to True if/when an abort handler is registered)
+            if self.properties.get("is_abortable") is None:
+                self.update_properties({"is_abortable": False})
+        elif status in [
+            TaskStatus.SUCCESS.value,
+            TaskStatus.FAILURE.value,
+            TaskStatus.ABORTED.value,
+        ]:
+            if not self.ended_at:
+                self.ended_at = now
+            # Update dedup_key to UUID to free up the slot for new tasks
+            self.dedup_key = get_finished_dedup_key(self.uuid)
+        # Note: ABORTING status doesn't set ended_at yet - that happens when
+        # the task transitions to ABORTED after handlers complete
+
+    @property
+    def is_pending(self) -> bool:
+        """Check if task is pending."""
+        return self.status == TaskStatus.PENDING.value
+
+    @property
+    def is_running(self) -> bool:
+        """Check if task is currently running."""
+        return self.status == TaskStatus.IN_PROGRESS.value
+
+    @property
+    def is_finished(self) -> bool:
+        """Check if task has finished (success, failure, or aborted)."""
+        return self.status in [
+            TaskStatus.SUCCESS.value,
+            TaskStatus.FAILURE.value,
+            TaskStatus.ABORTED.value,
+        ]
+
+    @property
+    def is_successful(self) -> bool:
+        """Check if task completed successfully."""
+        return self.status == TaskStatus.SUCCESS.value
+
+    @property
+    def duration_seconds(self) -> float | None:

Review Comment:
   `created_on` comes from the `AuditMixinNullable` mixin which is used by many 
models. So normalizing this would be challenging without having to make this 
change to all models built on the mixin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to