ashb commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r2993795009
##########
task-sdk/src/airflow/sdk/execution_time/supervisor.py:
##########
@@ -2120,3 +2120,99 @@ def supervise(
if close_client and client:
with suppress(Exception):
client.close()
+
+
+def supervise_workload(
+ workload: ExecutorWorkload,
+ *,
+ server: str | None = None,
+ dry_run: bool = False,
+ client: Client | None = None,
+ subprocess_logs_to_stdout: bool = False,
+ proctitle: str | None = None,
+) -> int:
+ """
+ Run any workload type to completion in a supervised subprocess.
+
+ Dispatch to the appropriate supervisor based on workload type.
Workload-specific
+ attributes (log_path, sentry_integration, bundle_info, etc.) are read from
the
+ workload object itself.
+
+ :param workload: The ``ExecutorWorkload`` to execute.
+ :param server: Base URL of the API server (used by task workloads).
+ :param dry_run: If True, execute without actual task execution (simulate
run).
+ :param client: Optional preconfigured client for communication with the
server.
+ :param subprocess_logs_to_stdout: Should task logs also be sent to stdout
via the main logger.
+ :param proctitle: Process title to set for this workload. If not provided,
defaults to
+ ``"airflow supervisor: <workload.display_name>"``. Executors may pass
a custom title
+ that includes executor-specific context (e.g. team name).
+ :return: Exit code of the process.
+ """
+ # Imports deferred to avoid an SDK/core dependency at module load time.
+ from airflow.executors.workloads.callback import ExecuteCallback
+ from airflow.executors.workloads.task import ExecuteTask
Review Comment:
Argh. This has just made me realise something.
Previously, the `supervise` function didn't take a workload object as a
deliberate design decision: The definition of the workload structure must be in
sync between the Executor and the worker code that picks up tasks (Celery
worker, edge worker etc) but _specifically_ Not the Task SDK.
One of the design goals with the SDK is that v 1.0.0 of the SDK must be
"wire compatible" with future versions of airflow core. Or to put it another
way, I should be able to continue to run a worker on Airflow Task SDK 1.0.0
with Airflow 3.2.0 API server + scheduler without changing a single thing about
the worker deployment/pod/venv etc.
By having workload definitions used here this rule could very easily be
broken in a point release, defeating one of the key goals of the Core/TaskSDK
separation.
I'm wondering if instead this generic supervise function should live in
BaseExecutor? It's not really right either, but this is at least already in the
shared path between Executor/scheduler and the code that runs on the worker (At
various points in the past year we have talked about having a new
`apache-airflow-base-executor` dist).
I'm really sorry I missed this on the earlier review. That's on me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]