diogosilva30 commented on PR #65943:
URL: https://github.com/apache/airflow/pull/65943#issuecomment-4440401632
> @jscheffl
>
> What do you think about changing the edge worker to handle task the same
way as LocalExecutor? At initial startup, pre-create a persistent process (like
fork pool) sized to concurrency and dispatch workloads through a queue. If we
set up the pool before the asyncio loop starts, this issue shouldn't occur
either.
@wjddn279 yeah, a pre-forked pool would sidestep the race (fork before
asyncio means single-threaded children), but I'm not sure it's the right shape
for the edge worker.
My hesitation with a persistent pool:
* Leaks, stale connections, open fds all accumulate across task runs. Right
now every task gets a clean process and that isolation earns its keep.
* If a pool worker dies mid-task, you have to detect it, restart it, and
reconcile with the Edge API. Per-task processes just... handle that.
* Plugin hot reload via git-sync breaks until a worker recycles since
long-lived workers hold onto their imported code.
* It touches the core execution model (dispatcher, queueing, lifecycle,
supervision). Way bigger blast radius than this PR warrants.
**What about `multiprocessing.forkserver`?**
stdlib already ships the right primitive here. One forkserver process is
started at startup, before asyncio, stays single-threaded for the worker's
lifetime, and is the only thing that ever calls `fork()`. Tasks are still fresh
COW children, just cloned from a clean forkserver instead of from the
multi-threaded asyncio main.
```
T0 main starts
mp.set_forkserver_preload([...])
ctx = mp.get_context("forkserver")
T1 main forks ONCE (still single-threaded)
| |
v v
main process forkserver process
starts asyncio imports preload modules
becomes multi-threaded stays single-threaded forever
on each task:
ctx.Process(target=supervise).start()
-> forkserver does fork() (safe, no threads)
-> child execs supervise()
```
Call-site diff is basically nothing:
```python
# before
p = Process(target=supervise, args=(job, child_conn))
p.start() # fork() from multi-threaded asyncio
# after
p = ctx.Process(target=supervise, args=(job, child_conn))
p.start() # fork() happens inside the forkserver
```
The real constraint is ordering: the forkserver context has to exist before
any asyncio import or thread start. That's a small CLI entry point change, not
a redesign.
Memory at 5 concurrent tasks:
| | Current fork | Subprocess (this PR) | Forkserver |
|-------------------|--------------|----------------------|-------------|
| Per-task overhead | ~13 MiB COW | ~98 MiB | ~15 MiB COW |
| Fixed overhead | 0 | 0 | ~150 MiB |
| Fork safety | unsafe | safe | safe |
| Total for 5 tasks | ~65 MiB | **~490 MiB** | **~225 MiB**|
Near-COW memory cost, same safety as subprocess, without the per-task RSS
hit you called out.
One caveat: `forkserver` is POSIX-only. Windows only supports `spawn`. That
said, the current `os.fork()` path doesn't work on Windows either so it's not a
regression. A Windows follow-up would just fall back to `spawn` (same profile
as this PR).
Potential order:
1. Merge this as-is. Small, opt-in, seems to fix the deadlocks issue I'm
seeing.
2. Follow-up: design and discuss forkserver behind
`execute_tasks_via_forkserver=True`, same opt-in model.
3. Once it's had some bake time, flip the default in a later 3.x and
deprecate the direct fork.
Happy to draft that follow-up if there's appetite for it. @jscheffl curious
what you think.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]