abhipalsingh opened a new issue, #68693:
URL: https://github.com/apache/airflow/issues/68693

   ### Under which category would you file this issue?
   
   Airflow Core
   
   ### Apache Airflow version
   
   3.2.1
   
   ### What happened and how to reproduce it?
   
   ## What happened
    
   When the API server serves logs for a **RUNNING** task instance, it ends up 
constructing a `KubernetesExecutor` (to obtain `get_task_log`). 
`KubernetesExecutor.__init__` creates a `multiprocessing.Manager()`, which 
forks a `serve_forever` server process. The API server never runs or shuts down 
this executor, so that `Manager` process is orphaned and runs forever (~350–400 
MB resident each).
    
   Because the executor is cached per process, it's **~1 leaked `Manager` per 
gunicorn/uvicorn worker** (e.g. up to 4 per API-server pod). It gets worse with 
gunicorn worker refresh / any worker recycling: when a worker is replaced, its 
`Manager` child reparents to PID 1 and is never reaped, so orphaned `Manager`s 
accumulate over time and contribute to API-server OOMs.
    
   Confirmed with `py-spy dump --native` of a hung child (the `api-server 
gunicorn: worker` proc with `PPID` = another worker):
    
   ```
   Thread (idle): "AnyIO worker thread"
     serve_forever (multiprocessing/managers.py:176)
     _run_server (multiprocessing/managers.py:599)
     run (multiprocessing/process.py:108)
     _bootstrap (multiprocessing/process.py:314)
     _launch (multiprocessing/popen_fork.py:71)
     _Popen (multiprocessing/context.py:282)
     start (multiprocessing/managers.py:562)
     Manager (multiprocessing/context.py:57)
     __init__ (.../cncf/kubernetes/executors/kubernetes_executor.py:104)   # 
self._manager = multiprocessing.Manager()
     load_executor (airflow/executors/executor_loader.py:368)
     get_default_executor (airflow/executors/executor_loader.py:294)
     _get_executor_get_task_log (airflow/utils/log/file_task_handler.py:577)
     _read (airflow/utils/log/file_task_handler.py:637)
     read (airflow/utils/log/file_task_handler.py:767)
     read_log_stream (airflow/utils/log/log_reader.py:132)
     _buffered_ndjson_stream 
(airflow/api_fastapi/core_api/routes/public/log.py:70)
   ```
    
   The relevant call is unconditional for running tasks:
    
   ```python
   # airflow/utils/log/file_task_handler.py  (_read)
   if ti.state == TaskInstanceState.RUNNING:
       executor_get_task_log = self._get_executor_get_task_log(ti)   # -> 
get_default_executor() -> KubernetesExecutor()
       response = executor_get_task_log(ti, try_number)
   ```
    
   ```python
   # providers/cncf/kubernetes/executors/kubernetes_executor.py  (__init__, 
~line 104)
   self._manager = multiprocessing.Manager()
   self.task_queue   = self._manager.JoinableQueue()
   self.result_queue = self._manager.JoinableQueue()
   ```
   
   ## How to reproduce
    
   1. Deploy Airflow 3.2.1 with `executor = KubernetesExecutor` and a 
multi-replica API server (gunicorn).
   2. Start a DAG so a task is in the running state.
   3. Open that running task's logs in the UI (or `GET 
/api/v2/dags/{dag}/dagRuns/{run}/taskInstances/{task}/logs/{n}`).
   4. On the API-server pod, observe a new forked process per serving worker:
   ```bash
      kubectl exec <api-server-pod> -c api-server -- ps -eo pid,ppid,rss,args | 
grep gunicorn
      # a "gunicorn: worker" whose PPID is another worker (not the master); 
py-spy shows it in
      # multiprocessing managers.serve_forever -> KubernetesExecutor.__init__
   ```
    
   5. It is never reaped; with worker refresh enabled it orphans to PID 1 and 
accumulates.
   
   ### What you think should happen instead?
   
   Reading a running task's logs on the API server should not instantiate a 
full executor with a `multiprocessing.Manager()` (the API server never runs the 
executor's scheduling loop). Options:
    
   - Fetch `get_task_log` without constructing/initializing the executor's 
runtime resources (e.g. a lightweight/classmethod path, or lazily create the 
`Manager` only when the scheduler actually starts the executor), and/or
   - Ensure any executor instantiated purely for `get_task_log` is properly 
shut down (`Manager.shutdown()` / `executor.end()`), so no `serve_forever` 
process is leaked.
   
   ### Operating System
   
   Linux (EKS / Debian-based official image)
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Apache Airflow Provider(s)
   
   _No response_
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Official Helm Chart version
   
   1.17.0
   
   ### Kubernetes Version
   
   1.35
   
   ### Helm Chart configuration
   
   _No response_
   
   ### Docker Image customizations
   
   _No response_
   
   ### Anything else?
   
   - Impact scales with worker recycling: each `worker_refresh` (or 
`max_requests`) cycle orphans the per-worker `Manager`, so leaked `Manager`s 
grow over time and push the API server toward OOM.
   - **Workaround we used:** disable worker refresh to cap it at ~1 
`Manager`/worker, and restart the deployment to clear orphans. This bounds but 
does not eliminate the leak.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to