pcolladosoto commented on issue #22191: URL: https://github.com/apache/airflow/issues/22191#issuecomment-1074273533
# Trying to explain things... Our team has run into this issue time and time again. We have tried different combinations of both Airflow and Python versions to no avail. ## TL;DR When a `DAGFileProcessor` hangs and is killed due to a timeout we believe the `self.waitables` and `self._processors` attributes of the `DAGFileProcessorManager` are not being updated as they should. This causes an unhandled exception when trying to receive data on a pipe end (i.e. file descriptor) which has already been closed. ## The long read... We're running a decouple Airflow deployment within a k8s cluster. We are currently using a 3-container *pod* where one of them runs the *Web Server*, another one executes the *Scheduler* and the third one implements *Flower* (we're using the *CeleryExecutor*). The backbone of the deployment is implemented through a *StatefulSet* that runs the Celery executors themselves. The trace we were seeing on the scheduler time and time again was: ``` Process ForkServerProcess-1: Traceback (most recent call last): File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap self.run() File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run self._target(*self._args, **self._kwargs) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager processor_manager.start() File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py", line 610, in start return self._run_parsing_loop() File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop self._collect_results_from_processor(processor) File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor if processor.result is not None: File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 321, in result if not self.done: File "/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py", line 286, in done if self._parent_channel.poll(): File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 255, in poll self._check_closed() File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 136, in _check_closed raise OSError("handle is closed") OSError: handle is closed ``` This has been thrown by Airflow 2.1.3, but we've seen very similar (if not equal) variations with versions all the way up to Airflow 2.2.4. Given we traced the problem down to the way multiprocessing synchronisation was being handled we played around with `multiprocessing`'s [*start method*](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods) through the `mp_start_method` configuration parameter which wasn't included in the stock configuration example: https://github.com/apache/airflow/blob/f309ea78f7d8b62383bc41eac217681a0916382b/airflow/utils/mixins.py#L27-L38 The containers we are using leverage `fork` as the default way of creating new process. After trying that one out we moved on to using `spawn` and ended up settling on `forkserver`. No matter the *start method* we leveraged, we ran into the same issue over and over again. For a while we coped with this behaviour by just restarting the Airflow deployment on an hourly basis, but we decided to set some time apart today to delve a bit deeper into all this. The good news is after a thorough investigation we noticed a pattern the preceded the crash. In order to pin it down down ran [`ps(1)`](https://www.man7.org/linux/man-pages/man1/ps.1.html) on the scheduler container. We also monitored the *DAG Processor Manager* log (which we have at `/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log` given our Airflow home is `/opt/airflow`) and we took a look at the scheduler's log through `kubectl logs` given it's sent to *stdout/stderr*. The pattern itself goes something like: 1. A `DAGFileProcessor` get's stuck for longer than `dag_file_processor_timeout` as seen on `ps`' output. 2. As soon as the timeout is exceeded, the `DAGFileProcessorManager` kills the stuck `DAGFileProcessor`. 3. When the `DAGFileProcessorManager` tries to collect results back from the different `DAGFileProcessor`s it crashes. The above led us to believe something was a bit off in the way the `DAGFileProcessor`s were being killed. Given our Docker-based deployment allowed for it, we retrieved a copy of the stock [`manager.py`](https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py) and [`processor.py`](https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py) files and added a bit of logging through `self.log.debug()`. The following appeared on out `DAGFileProcessorManager` log: ``` [2022-03-21 13:01:00,747] {manager.py:1163} DEBUG - FOO - Looking for DAG processors to terminate due to timeouts! [2022-03-21 13:01:00,748] {manager.py:1172} ERROR - Processor for /opt/airflow/dags/dags-airflow/spark_streaming.py with PID 965 started at 2022-03-21T13:00:10.536124+00:00 has timed out, killing it. [2022-03-21 13:01:00,748] {manager.py:1178} DEBUG - FOO - # of waitables BEFORE killing timed out processor: 2 [2022-03-21 13:01:00,748] {manager.py:1180} DEBUG - FOO - # of waitables AFTER killing timed out processor: 2 [2022-03-21 13:01:00,748] {manager.py:1013} DEBUG - 1/1 DAG parsing processes running [2022-03-21 13:01:00,748] {manager.py:1015} DEBUG - 2 file paths queued for processing [2022-03-21 13:01:00,758] {manager.py:978} DEBUG - Processor for /opt/airflow/dags/dags-airflow/spark_streaming.py finished [2022-03-21 13:01:00,758] {manager.py:982} DEBUG - FOO - Trying to get result for processor with PID: 965 ``` Can you see how the number of `waitables` (more on that later) doesn't change even though we're killing a `DAGFileProcessor`? We believe that's what's causing the trouble... Note we added the `- FOO -` token to the logging entries we added to easily `grep` for them. These entries were generated with calls to `self.log.debug()` within the `_kill_timed_out_processors()` function. The 'stock' version is: https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L1159-L1175 After we added additional logging it looked like: ```python def _kill_timed_out_processors(self): """Kill any file processors that timeout to defend against process hangs.""" self.log.debug("FOO - Looking for DAG processors to terminate due to timeouts!") now = timezone.utcnow() processors_to_remove = [] for file_path, processor in self._processors.items(): duration = now - processor.start_time if duration > self._processor_timeout: self.log.error( "Processor for %s with PID %s started at %s has timed out, killing it.", file_path, processor.pid, processor.start_time.isoformat(), ) Stats.decr('dag_processing.processes') Stats.incr('dag_processing.processor_timeouts') # TODO: Remove after Airflow 2.0 Stats.incr('dag_file_processor_timeouts') self.log.debug(f"FOO - # of waitables BEFORE killing timed out processor: {len(self.waitables)}") processor.kill() self.log.debug(f"FOO - # of waitables AFTER killing timed out processor: {len(self.waitables)}") ``` You can see how we call the `kill()` (which basically wraps the processor's `_kill_process()` method) method on the timed out processor. We believe the key of all this resides on `line 246` of: https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L238-L246 Notice how the end of the communicating pipe opened on `line 187` is being closed here: https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L187 That's exactly the same pipe end (i.e. file descriptor) the `DAGFileProcessorManager` tries to read from later on! If we look at the traceback we included before it's `line 286` which ultimately triggers the `OSError`: https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L286 What it's trying to do is `poll()` a closed pipe. If we take a look at `multiprocessing`'s implementation we'll check how, as shown on the traceback, it calls `_check_closed()` on the pipe's file descriptor (i.e. handle) before proceeding: this, as seen before, triggers the `OSError`. So... why are we trying to collect results from a `DAGFileProcessor` we killed due to a timeout? in order to answer that we took a walk around `_run_parsing_loop()`: https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L612-L734 It basically runs an infinite (unless we specify a maximum number of runs) loop that calls [`multiprocessing.connection.wait`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.wait) based on the contents of `self.waitables`. This attribute is a dictionary containing a reference to the different `DAGFileProcessor`'s spawned by the `DAGFileProcessorManager`. Entries are added on `line 1034` within `start_new_process()`: https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L1015-L1034 However, this dictionary **is not** updated when a processor is killed due to a timeout. You can check that out on the snippet we included above. Thus, after the timed out `DAGFileProcessor` is killed, the infinite loop on `_run_parsing_loop()` will assume the underlying process is ready (it's done, as we've effectively terminated it) and will try to read from the pipe end we closed on `_kill_process()`, thus triggering the exception and bringing everything down. In other words, we believe the `self.waitables` attribute is not being updated as it should when `DAGFileProcessor`s are terminated due to timeouts. The same is true for the `self._processors` attribute on the `DAGFileProcessorManager`. After all, `_kill_timed_out_processors()` iterates over its contents... If we don't update it too we'll see how we try to kill an already terminated process over and over again. After some testing we arrived at the following implementation of `_kill_timed_out_processors()`: ```python def _kill_timed_out_processors(self): """Kill any file processors that timeout to defend against process hangs.""" self.log.debug("FOO - ** Entering _kill_timed_out_processors() **") # We'll get a clear picture of what these two attributes look like before # killing anything. self.log.debug(f"FOO - We'll iterate over: {self._processors}") self.log.debug(f"FOO - Current waitables dir: {self.waitables}") now = timezone.utcnow() processors_to_remove = [] for file_path, processor in self._processors.items(): duration = now - processor.start_time if duration > self._processor_timeout: self.log.error( "Processor for %s with PID %s started at %s has timed out, killing it.", file_path, processor.pid, processor.start_time.isoformat(), ) Stats.decr('dag_processing.processes') Stats.incr('dag_processing.processor_timeouts') # TODO: Remove after Airflow 2.0 Stats.incr('dag_file_processor_timeouts') # Add some logging to check stuff out self.log.debug(f"FOO - # of waitables BEFORE killing timed out processor: {len(self.waitables)}") self.log.debug(f"FOO - We'll iterate over: {self._processors}") self.log.debug(f"FOO - Current waitables dir: {self.waitables}") # Kill the hanged processor processor.kill() # Update self.waitables to avoid asking for results later on self.waitables.pop(processor.waitable_handle) # Make a note of the file_paths we are to remove later on: we feel a bit uneasy about modifying the # container we're currently iterating over... processors_to_remove.append(file_path) # Add some logging to check how stuff is doing... self.log.debug(f"FOO - # of waitables AFTER killing timed out processor: {len(self.waitables)}") self.log.debug(f"FOO - We'll iterate over: {self._processors}") self.log.debug(f"FOO - Current waitables dir: {self.waitables}") # Clean up `self._processors` too! for proc in processors_to_remove: self._processors.pop(proc) # And after we're done take a look at the final state self.log.debug(f"FOO - Processors after cleanup: {self._processors}") self.log.debug(f"FOO - Waitables after cleanup: {self.waitables}") self.log.debug(f"FOO - ** Leaving _kill_timed_out_processors() with **") ``` We know the above can surely be written in a more succinct/better way: we're by no means good programmers! Against all odds, the code above seems to prevent the crash! :tada: It does, however, spawn zombies when we kill the `DAGFileProcessor`: it's not being `wait()`ed for... We decided to also play around with the `DAGFileProcessor`'s `_kill_process()` method a bit in the name of science to try and prevent that zombie from spawning: ```python def _kill_process(self) -> None: if self._process is None: raise AirflowException("Tried to kill process before starting!") if self._process.is_alive() and self._process.pid: self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid) os.kill(self._process.pid, signal.SIGKILL) # Reap the resulting zombie! Note the call to `waitpid()` blocks unless we # leverage the `WNOHANG` (https://docs.python.org/3/library/os.html#os.WNOHANG) # option. Given we were just playing around we decided not to bother with that... self.warning(f"FOO - Waiting to reap the zombie spawned from PID {self._process.pid}") os.waitpid(self._process.pid) self.warning(f"FOO - Reaped the zombie spawned from PID {self._process.pid}") if self._parent_channel: self._parent_channel.close() ``` From what we could see, the above reaped the zombie like we initially expected it to. So, after all this nonsense we just wanted to end up by saying that we believe it's the way `DAGFileManagerProcessor`'s attributes are being cleaned up that crashes Airflow for us. In our experience this is triggered by a `DAGFileProcessor` being forcefully terminated after a timeout. We would also like to thank everybody making Airflow possible: it's one heck of a tool! Feel free to ask for more details and, if we got anything wrong (it wouldn't be the first time), please do let us know! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org