ashb commented on code in PR #23944:
URL: https://github.com/apache/airflow/pull/23944#discussion_r1850499129
##########
airflow/executors/local_executor.py:
##########
@@ -403,32 +202,91 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Execute asynchronously."""
- if TYPE_CHECKING:
- assert self.impl
-
self.validate_airflow_tasks_run_command(command)
+ self.activity_queue.put((key, command))
+ self._outstanding_messages += 1
+ self._check_workers(can_start=True)
+
+ def _check_workers(self, can_start: bool = True):
+ # Reap any dead workers
+ to_remove = set()
+ for pid, proc in self.workers.items():
+ if not proc.is_alive():
+ to_remove.add(pid)
+ proc.close()
+
+ if to_remove:
+ self.workers = {pid: proc for pid, proc in self.workers.items() if
pid not in to_remove}
+
+ # If we're using spawn in multiprocessing (default on macos now) to
start tasks, this can get called a
+ # via sync() a few times before the spawned process actually starts
picking up messages. Try not to
+ # create too much
+
+ if self._outstanding_messages <= 0 or self.activity_queue.empty():
+ # Nothing to do, should we shut down idle workers?
+ return
- self.impl.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
+ need_more_workers = len(self.workers) < self._outstanding_messages
+ if need_more_workers and (self.parallelism == 0 or len(self.workers) <
self.parallelism):
+ self._spawn_worker()
+
+ def _spawn_worker(self):
+ p = multiprocessing.Process(
+ target=_run_worker,
+ kwargs={
+ "logger_name": self.log.name,
+ "input": self.activity_queue,
+ "output": self.result_queue,
+ },
+ )
+ p.start()
+ if TYPE_CHECKING:
+ assert p.pid # Since we've called start
+ self.workers[p.pid] = p
def sync(self) -> None:
"""Sync will get called periodically by the heartbeat method."""
- if TYPE_CHECKING:
- assert self.impl
+ self._read_results()
+ self._check_workers()
- self.impl.sync()
+ def _read_results(self):
+ while not self.result_queue.empty():
+ key, state, exc = self.result_queue.get()
+ self._outstanding_messages = self._outstanding_messages - 1
+
+ if exc:
+ # TODO: This needs a better stacktrace, it appears from here
+ if hasattr(exc, "add_note"):
+ exc.add_note("(This stacktrace is incorrect -- the
exception came from a subprocess)")
Review Comment:
Planning to leave this -- it happens so rarely (as the `_execute_work` fns
have try/except) so this is just an final line of defense to show something
when the task goes wrong.
(The only way I was able to trigger this was a syntax error when developing
it)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]