michael-s-molina commented on code in PR #36368: URL: https://github.com/apache/superset/pull/36368#discussion_r2673261118
########## docs/developer_portal/async-tasks.md: ########## @@ -0,0 +1,460 @@ +--- +title: Async Task Framework +sidebar_label: Async Tasks +sidebar_position: 5 +--- + + +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Global Async Task Framework (GATF) + +The Global Async Task Framework provides a unified way to manage asynchronous tasks in Apache Superset. It handles task registration, execution, status tracking, cancellation, and deduplication. + +## Overview + +GATF uses the **ambient context pattern** where tasks access their execution context via `get_context()` instead of receiving it as a parameter. This results in clean, business-focused function signatures without framework boilerplate. + +### Key Features + +- **Clean Signatures**: Task functions contain only business args +- **Ambient Context**: Access context via `get_context()` - no parameter passing +- **Dual Execution**: Synchronous (for testing) and asynchronous (via Celery) +- **Optional Deduplication**: Use idempotency keys to prevent duplicate execution +- **Progressive Updates**: Update payload and check cancellation during execution +- **Type Safety**: Full type hints with ParamSpec support + +## Quick Start + +### Define a Task + +```python +import requests +from superset_core.api.types import async_task, get_context + +@async_task() +def fetch_data(api_url: str) -> None: + """ + Example task that fetches data from an external API. + + Features: + - Automatic cancellation check before execution + - Simple cleanup handler + - Cancellation checking during execution + """ + ctx = get_context() + + # Cleanup runs automatically on success, failure, or cancellation + @ctx.on_cleanup + def cleanup(): + logger.info("Data fetch completed") + + # No initial check needed - framework checks before execution! + # Fetch data with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before next operation + if ctx.is_cancelled(): + return + + # Process and cache the data + process_and_cache(data) +``` + +### Execute Tasks Asynchronously or Synchronously + +The `@async_task` decorator enables flexible execution modes: + +```python +# Asynchronous execution via Celery (for production workloads) +task = long_running_task.schedule() +# Task runs in background worker, returns immediately +print(task.status) # "pending" + +# Synchronous execution (for testing or when blocking is acceptable) +task = long_running_task() +# Task executes inline, blocks until complete +print(task.status) # "success" +``` + +**When to use each mode:** +- **Async (`.schedule()`)**: Production workloads, long-running operations, non-blocking execution +- **Sync (direct call)**: Unit testing, development, or lightweight operations + +## Core Concepts + +### Ambient Context + +Tasks access execution context via `get_context()`: + +```python +@async_task() +def my_task(business_arg: int) -> None: + ctx = get_context() # Ambient context access + task = ctx.task # Task entity + user = ctx.user # User who dispatched task + + task.set_payload({"arg": business_arg}) + ctx.update_task(task) +``` + +### Task Lifecycle + +1. **PENDING**: Task created, awaiting execution +2. **IN_PROGRESS**: Currently executing +3. **SUCCESS**: Completed successfully +4. **FAILURE**: Failed with error +5. **CANCELLED**: Cancelled before/during execution + +### Deduplication + +By default, each task gets a random UUID. Use `idempotency_key` for explicit deduplication: + +```python +# Without deduplication - creates new task each time +task1 = my_task.schedule(arg=1) +task2 = my_task.schedule(arg=1) # Separate task + +# With deduplication - returns existing if active +task1 = my_task.schedule(arg=1, options=TaskOptions(idempotency_key="key")) +task2 = my_task.schedule(arg=1, options=TaskOptions(idempotency_key="key")) +# task2 is the same as task1 if task1 is PENDING or IN_PROGRESS +``` + +## Cancellation Support + +The framework provides built-in cancellation support with minimal boilerplate. + +### Cleanup Handlers + +Register cleanup functions that run automatically when a task ends (success, failure, or cancellation): + +```python +@async_task() +def my_task() -> None: + ctx = get_context() + + @ctx.on_cleanup + def cleanup(): + """Runs automatically when task ends""" + logger.info("Task completed") +``` + +**Multiple cleanup handlers** (execute in LIFO order): + +```python [email protected]_cleanup +def cleanup_cache(): + cache.clear() + [email protected]_cleanup +def cleanup_log(): + logger.info("Done") +``` + +### Automatic Pre-Execution Check + +**The framework automatically checks if a task was cancelled before execution starts.** You don't need an initial `if ctx.is_cancelled()` check - just start working! + +### Checking During Execution + +Check for cancellation at key points during execution: + +```python +@async_task() +def process_items(items: list[int]) -> None: + ctx = get_context() + + @ctx.on_cleanup + def cleanup(): + logger.info("Processing ended") + + # No initial check needed - framework handles it! + + for i, item in enumerate(items): + # Check every 10 items + if i % 10 == 0 and ctx.is_cancelled(): + return + + process_single_item(item) +``` + +### Using Helper Methods + +**`ctx.run()` - Pre-check wrapper (optional):** + +```python +@async_task() +def fetch_and_process(api_url: str) -> None: + ctx = get_context() + + @ctx.on_cleanup + def cleanup(): + logger.info("Fetch completed") + + # Helper checks cancellation before executing + response = ctx.run(lambda: requests.get(api_url, timeout=60)) + if response is None: + return # Cancelled + + data = ctx.run(lambda: response.json()) + if data is None: + return + + cache.set("data", data) +``` + +**`ctx.update_progress()` - Progress tracking with cancellation check:** + +```python +@async_task() +def process_batch(item_ids: list[int]) -> None: + ctx = get_context() + + for i, item_id in enumerate(item_ids): + # Combined progress update + cancellation check + if not ctx.update_progress(i + 1, len(item_ids)): + return # Cancelled + + process_single_item(item_id) +``` + +## Advanced Usage + +### Complete Example: API Fetch with Cleanup + +```python +@async_task() +def fetch_and_cache(api_url: str, chart_id: int) -> None: + """Fetch from external API and cache results.""" + ctx = get_context() + cache_key = f"chart_{chart_id}_data" + + @ctx.on_cleanup + def cleanup(): + if ctx.is_cancelled(): + # Clear partial cache on cancellation + cache.delete(cache_key) + logger.info(f"Fetch cancelled, cleared cache: {cache_key}") + else: + logger.info(f"Fetch completed: {cache_key}") + + # Fetch with timeout (prevents hanging) + response = requests.get(api_url, timeout=60) + data = response.json() + + # Check before expensive processing + if ctx.is_cancelled(): + return + + processed = process_data(data) + cache.set(cache_key, processed) +``` + +### Progressive Updates + +Update progress and check cancellation simultaneously: + +```python +@async_task() +def multi_step_task(item_ids: list[int]) -> None: + ctx = get_context() + + @ctx.on_cleanup + def cleanup(): + logger.info(f"Processed {ctx.task.get_payload().get('count', 0)} items") + + for i, item_id in enumerate(item_ids): + # Update progress and check cancellation + if not ctx.update_progress( + i + 1, + len(item_ids), + current_item=item_id + ): + return # Cancelled + + process_item(item_id) + + ctx.task.set_payload({"count": len(item_ids)}) Review Comment: Should we make this explicit in the documentation (functions with @async_task annotation should not have return values)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
