kaxil commented on code in PR #23944:
URL: https://github.com/apache/airflow/pull/23944#discussion_r1850467773
##########
airflow/executors/local_executor.py:
##########
@@ -403,32 +202,91 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Execute asynchronously."""
- if TYPE_CHECKING:
- assert self.impl
-
self.validate_airflow_tasks_run_command(command)
+ self.activity_queue.put((key, command))
+ self._outstanding_messages += 1
+ self._check_workers(can_start=True)
+
+ def _check_workers(self, can_start: bool = True):
+ # Reap any dead workers
+ to_remove = set()
+ for pid, proc in self.workers.items():
+ if not proc.is_alive():
+ to_remove.add(pid)
+ proc.close()
+
+ if to_remove:
+ self.workers = {pid: proc for pid, proc in self.workers.items() if
pid not in to_remove}
+
+ # If we're using spawn in multiprocessing (default on macos now) to
start tasks, this can get called a
+ # via sync() a few times before the spawned process actually starts
picking up messages. Try not to
+ # create too much
+
+ if self._outstanding_messages <= 0 or self.activity_queue.empty():
+ # Nothing to do, should we shut down idle workers?
Review Comment:
Worth creating a GH issue for it if someone wants to take care of it
##########
airflow/executors/local_executor.py:
##########
@@ -403,32 +202,91 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Execute asynchronously."""
- if TYPE_CHECKING:
- assert self.impl
-
self.validate_airflow_tasks_run_command(command)
+ self.activity_queue.put((key, command))
+ self._outstanding_messages += 1
+ self._check_workers(can_start=True)
+
+ def _check_workers(self, can_start: bool = True):
+ # Reap any dead workers
+ to_remove = set()
+ for pid, proc in self.workers.items():
+ if not proc.is_alive():
+ to_remove.add(pid)
+ proc.close()
+
+ if to_remove:
+ self.workers = {pid: proc for pid, proc in self.workers.items() if
pid not in to_remove}
+
+ # If we're using spawn in multiprocessing (default on macos now) to
start tasks, this can get called a
+ # via sync() a few times before the spawned process actually starts
picking up messages. Try not to
+ # create too much
+
+ if self._outstanding_messages <= 0 or self.activity_queue.empty():
+ # Nothing to do, should we shut down idle workers?
+ return
- self.impl.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
+ need_more_workers = len(self.workers) < self._outstanding_messages
+ if need_more_workers and (self.parallelism == 0 or len(self.workers) <
self.parallelism):
+ self._spawn_worker()
+
+ def _spawn_worker(self):
+ p = multiprocessing.Process(
+ target=_run_worker,
+ kwargs={
+ "logger_name": self.log.name,
+ "input": self.activity_queue,
+ "output": self.result_queue,
+ },
+ )
+ p.start()
+ if TYPE_CHECKING:
+ assert p.pid # Since we've called start
+ self.workers[p.pid] = p
def sync(self) -> None:
"""Sync will get called periodically by the heartbeat method."""
- if TYPE_CHECKING:
- assert self.impl
+ self._read_results()
+ self._check_workers()
- self.impl.sync()
+ def _read_results(self):
+ while not self.result_queue.empty():
+ key, state, exc = self.result_queue.get()
+ self._outstanding_messages = self._outstanding_messages - 1
+
Review Comment:
Is there ever a risk of this becoming inconsistent due to exceptions or
unexpected shutdowns?
##########
airflow/executors/local_executor.py:
##########
@@ -403,32 +202,91 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Execute asynchronously."""
- if TYPE_CHECKING:
- assert self.impl
-
self.validate_airflow_tasks_run_command(command)
+ self.activity_queue.put((key, command))
+ self._outstanding_messages += 1
+ self._check_workers(can_start=True)
+
+ def _check_workers(self, can_start: bool = True):
+ # Reap any dead workers
+ to_remove = set()
+ for pid, proc in self.workers.items():
+ if not proc.is_alive():
+ to_remove.add(pid)
+ proc.close()
+
+ if to_remove:
+ self.workers = {pid: proc for pid, proc in self.workers.items() if
pid not in to_remove}
+
+ # If we're using spawn in multiprocessing (default on macos now) to
start tasks, this can get called a
+ # via sync() a few times before the spawned process actually starts
picking up messages. Try not to
+ # create too much
+
+ if self._outstanding_messages <= 0 or self.activity_queue.empty():
+ # Nothing to do, should we shut down idle workers?
+ return
- self.impl.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
+ need_more_workers = len(self.workers) < self._outstanding_messages
+ if need_more_workers and (self.parallelism == 0 or len(self.workers) <
self.parallelism):
+ self._spawn_worker()
Review Comment:
If we have 2 workers and `_outstanding_messages` are 4, this is just going
to create 1 worker. So we rely on multiple `_check_workers` invocations in
sync/heartbeat. Worth adding a comment here or somewhere
##########
airflow/executors/local_executor.py:
##########
@@ -25,192 +25,139 @@
from __future__ import annotations
-import contextlib
import logging
+import multiprocessing
import os
import subprocess
-from abc import abstractmethod
-from multiprocessing import Manager, Process
-from queue import Empty
+from multiprocessing import Queue, SimpleQueue
from typing import TYPE_CHECKING, Any, Optional, Tuple
-from setproctitle import getproctitle, setproctitle
-
from airflow import settings
-from airflow.exceptions import AirflowException
from airflow.executors.base_executor import PARALLELISM, BaseExecutor
-from airflow.traces.tracer import Trace, add_span
+from airflow.traces.tracer import add_span
from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
-from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
if TYPE_CHECKING:
- from multiprocessing.managers import SyncManager
- from queue import Queue
-
from airflow.executors.base_executor import CommandType
- from airflow.models.taskinstance import TaskInstanceStateType
from airflow.models.taskinstancekey import TaskInstanceKey
# This is a work to be executed by a worker.
# It can Key and Command - but it can also be None, None which is actually
a
# "Poison Pill" - worker seeing Poison Pill should take the pill and ...
die instantly.
- ExecutorWorkType = Tuple[Optional[TaskInstanceKey], Optional[CommandType]]
+ ExecutorWorkType = Optional[Tuple[TaskInstanceKey, CommandType]]
+ TaskInstanceStateType = Tuple[TaskInstanceKey, TaskInstanceState,
Optional[Exception]]
-class LocalWorkerBase(Process, LoggingMixin):
- """
- LocalWorkerBase implementation to run airflow commands.
+def _run_worker(logger_name: str, input: SimpleQueue[ExecutorWorkType],
output: Queue[TaskInstanceStateType]):
+ import signal
- Executes the given command and puts the result into a result queue when
done, terminating execution.
+ from setproctitle import setproctitle
Review Comment:
This is not at the top for some reason, right? I remember discussing but
can't remember
##########
tests/executors/test_local_executor.py:
##########
@@ -44,85 +53,72 @@ def test_serve_logs_default_value(self):
assert LocalExecutor.serve_logs
@mock.patch("airflow.executors.local_executor.subprocess.check_call")
- def execution_parallelism_subprocess(self, mock_check_call, parallelism=0):
- success_command = ["airflow", "tasks", "run", "true",
"some_parameter", "2020-10-07"]
- fail_command = ["airflow", "tasks", "run", "false", "task_id",
"2020-10-07"]
+ @mock.patch("airflow.cli.commands.task_command.task_run")
+ def _test_execute(self, mock_run, mock_check_call, parallelism=1):
+ success_command = ["airflow", "tasks", "run", "success",
"some_parameter", "2020-10-07"]
+ fail_command = ["airflow", "tasks", "run", "failure", "task_id",
"2020-10-07"]
+ # We just mock both styles here, only one will be hit though
def fake_execute_command(command, close_fds=True):
if command != success_command:
raise subprocess.CalledProcessError(returncode=1, cmd=command)
else:
return 0
- mock_check_call.side_effect = fake_execute_command
-
- self._test_execute(parallelism, success_command, fail_command)
-
- @mock.patch("airflow.cli.commands.task_command.task_run")
- def execution_parallelism_fork(self, mock_run, parallelism=0):
- success_command = ["airflow", "tasks", "run", "success",
"some_parameter", "2020-10-07"]
- fail_command = ["airflow", "tasks", "run", "failure",
"some_parameter", "2020-10-07"]
-
def fake_task_run(args):
+ print(repr(args))
Review Comment:
```suggestion
```
##########
airflow/executors/local_executor.py:
##########
@@ -403,32 +202,91 @@ def execute_async(
executor_config: Any | None = None,
) -> None:
"""Execute asynchronously."""
- if TYPE_CHECKING:
- assert self.impl
-
self.validate_airflow_tasks_run_command(command)
+ self.activity_queue.put((key, command))
+ self._outstanding_messages += 1
+ self._check_workers(can_start=True)
+
+ def _check_workers(self, can_start: bool = True):
+ # Reap any dead workers
+ to_remove = set()
+ for pid, proc in self.workers.items():
+ if not proc.is_alive():
+ to_remove.add(pid)
+ proc.close()
+
+ if to_remove:
+ self.workers = {pid: proc for pid, proc in self.workers.items() if
pid not in to_remove}
+
+ # If we're using spawn in multiprocessing (default on macos now) to
start tasks, this can get called a
+ # via sync() a few times before the spawned process actually starts
picking up messages. Try not to
+ # create too much
+
+ if self._outstanding_messages <= 0 or self.activity_queue.empty():
+ # Nothing to do, should we shut down idle workers?
+ return
- self.impl.execute_async(key=key, command=command, queue=queue,
executor_config=executor_config)
+ need_more_workers = len(self.workers) < self._outstanding_messages
+ if need_more_workers and (self.parallelism == 0 or len(self.workers) <
self.parallelism):
+ self._spawn_worker()
+
+ def _spawn_worker(self):
+ p = multiprocessing.Process(
+ target=_run_worker,
+ kwargs={
+ "logger_name": self.log.name,
+ "input": self.activity_queue,
+ "output": self.result_queue,
+ },
+ )
+ p.start()
+ if TYPE_CHECKING:
+ assert p.pid # Since we've called start
+ self.workers[p.pid] = p
def sync(self) -> None:
"""Sync will get called periodically by the heartbeat method."""
- if TYPE_CHECKING:
- assert self.impl
+ self._read_results()
+ self._check_workers()
- self.impl.sync()
+ def _read_results(self):
+ while not self.result_queue.empty():
+ key, state, exc = self.result_queue.get()
+ self._outstanding_messages = self._outstanding_messages - 1
+
+ if exc:
+ # TODO: This needs a better stacktrace, it appears from here
+ if hasattr(exc, "add_note"):
+ exc.add_note("(This stacktrace is incorrect -- the
exception came from a subprocess)")
Review Comment:
Planning to fix it here or separate PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]