ferruzzi commented on code in PR #62645:
URL: https://github.com/apache/airflow/pull/62645#discussion_r2962935869


##########
airflow-core/src/airflow/executors/local_executor.py:
##########
@@ -99,73 +98,59 @@ def _run_worker(
         with unread_messages:
             unread_messages.value -= 1
 
-        # Handle different workload types
-        if isinstance(workload, workloads.ExecuteTask):
-            try:
-                _execute_work(log, workload, team_conf)
-                output.put((workload.ti.key, TaskInstanceState.SUCCESS, None))
-            except Exception as e:
-                log.exception("Task execution failed.")
-                output.put((workload.ti.key, TaskInstanceState.FAILED, e))
-
-        elif isinstance(workload, workloads.ExecuteCallback):
-            output.put((workload.callback.id, CallbackState.RUNNING, None))
-            try:
-                _execute_callback(log, workload, team_conf)
-                output.put((workload.callback.id, CallbackState.SUCCESS, None))
-            except Exception as e:
-                log.exception("Callback execution failed")
-                output.put((workload.callback.id, CallbackState.FAILED, e))
-
-        else:
-            raise ValueError(f"LocalExecutor does not know how to handle 
{type(workload)}")
-
-
-def _execute_work(log: Logger, workload: workloads.ExecuteTask, team_conf) -> 
None:
-    """
-    Execute command received and stores result state in queue.
+        try:
+            _execute_workload(log, workload, team_conf)
+            output.put((workload.key, workload.success_state, None))
+        except Exception as e:
+            log.exception("Workload execution failed.", 
workload_type=type(workload).__name__)
+            output.put((workload.key, workload.failure_state, e))
 
-    :param log: Logger instance
-    :param workload: The workload to execute
-    :param team_conf: Team-specific executor configuration
-    """
-    from airflow.sdk.execution_time.supervisor import supervise
-
-    setproctitle(f"{_get_executor_process_title_prefix(team_conf.team_name)} 
{workload.ti.id}", log)
-
-    base_url = team_conf.get("api", "base_url", fallback="/")
-    # If it's a relative URL, use localhost:8080 as the default
-    if base_url.startswith("/"):
-        base_url = f"http://localhost:8080{base_url}";
-    default_execution_api_server = f"{base_url.rstrip('/')}/execution/"
-
-    # This will return the exit code of the task process, but we don't care 
about that, just if the
-    # _supervisor_ had an error reporting the state back (which will result in 
an exception.)
-    supervise(
-        # This is the "wrong" ti type, but it duck types the same. TODO: 
Create a protocol for this.
-        ti=workload.ti,  # type: ignore[arg-type]
-        dag_rel_path=workload.dag_rel_path,
-        bundle_info=workload.bundle_info,
-        token=workload.token,
-        server=team_conf.get("core", "execution_api_server_url", 
fallback=default_execution_api_server),
-        log_path=workload.log_path,
-    )
-
-
-def _execute_callback(log: Logger, workload: workloads.ExecuteCallback, 
team_conf) -> None:
+
+def _execute_workload(log: Logger, workload: ExecutorWorkload, team_conf) -> 
None:
     """
-    Execute a callback workload.
+    Execute any workload type in a supervised subprocess.
+
+    All workload types are run in a supervised child process, providing 
process isolation,
+    stdout/stderr capture, signal handling, and crash detection.
 
     :param log: Logger instance
-    :param workload: The ExecuteCallback workload to execute
+    :param workload: The workload to execute (ExecuteTask or ExecuteCallback)
     :param team_conf: Team-specific executor configuration
     """
-    setproctitle(f"{_get_executor_process_title_prefix(team_conf.team_name)} 
{workload.callback.id}", log)
-
-    success, error_msg = execute_callback_workload(workload.callback, log)
-
-    if not success:
-        raise RuntimeError(error_msg or "Callback execution failed")
+    setproctitle(f"{_get_executor_process_title_prefix(team_conf.team_name)} 
{workload.display_name}", log)
+
+    if isinstance(workload, workloads.ExecuteTask):

Review Comment:
   I think I came up with a solution here that works.   Let me know what you 
think when you get time.
   
   Also, @anishgirianish is doing a lot of work zipping up thew two workflows 
in https://github.com/apache/airflow/pull/63491 and that should be staged to 
merge right behind this one, so I don't want to steal too much of his thunder 
there or cause him a ton of merge conflicts with relocating things right now 
when he already has that in flight.  We can direct most of the structural 
requests his way I think. (assuming you are cool with that, of course)
   
   For example, I think maybe he can split this into a base_supervisor, 
task_supervisor, and callback_supervisor along with the stuff he is already 
shuffling around and this may eventually just become `workload.supervise()` 
where each workload type directs to the appropriate supervisor class... but I 
haven't played that trough all the way, we may ruin into core/sdk conflict 
there.... but it's an idea we've discussed and he is interested in looking into.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to