pcolladosoto commented on issue #22191:
URL: https://github.com/apache/airflow/issues/22191#issuecomment-1074273533


   # Trying to explain things...
   Our team has run into this issue time and time again. We have tried 
different combinations of both Airflow and Python versions to no avail.
   
   ## TL;DR
   When a `DAGFileProcessor` hangs and is killed due to a timeout we believe 
the `self.waitables` and `self._processors` attributes of the 
`DAGFileProcessorManager` are not being updated as they should. This causes an 
unhandled exception when trying to receive data on a pipe end (i.e. file 
descriptor) which has already been closed.
   
   ## The long read...
   We're running a decouple Airflow deployment within a k8s cluster. We are 
currently using a 3-container *pod* where one of them runs the *Web Server*, 
another one executes the *Scheduler* and the third one implements *Flower* 
(we're using the *CeleryExecutor*). The backbone of the deployment is 
implemented through a *StatefulSet* that runs the Celery executors themselves.
   
   The trace we were seeing on the scheduler time and time again was:
   
   ```
   Process ForkServerProcess-1:
   Traceback (most recent call last):
     File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in 
_bootstrap
       self.run()
     File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
       self._target(*self._args, **self._kwargs)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py",
 line 370, in _run_processor_manager
       processor_manager.start()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py",
 line 610, in start
       return self._run_parsing_loop()
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py",
 line 671, in _run_parsing_loop
       self._collect_results_from_processor(processor)
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/manager.py",
 line 981, in _collect_results_from_processor
       if processor.result is not None:
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py",
 line 321, in result
       if not self.done:
     File 
"/home/airflow/.local/lib/python3.7/site-packages/airflow/dag_processing/processor.py",
 line 286, in done
       if self._parent_channel.poll():
     File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 255, 
in poll
       self._check_closed()
     File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 136, 
in _check_closed
       raise OSError("handle is closed")
   OSError: handle is closed
   ```
   
   This has been thrown by Airflow 2.1.3, but we've seen very similar (if not 
equal) variations with versions all the way up to Airflow 2.2.4.
   
   Given we traced the problem down to the way multiprocessing synchronisation 
was being handled we played around with `multiprocessing`'s [*start 
method*](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods)
 through the `mp_start_method` configuration parameter which wasn't included in 
the stock configuration example:
   
   
https://github.com/apache/airflow/blob/f309ea78f7d8b62383bc41eac217681a0916382b/airflow/utils/mixins.py#L27-L38
   
   The containers we are using leverage `fork` as the default way of creating 
new process. After trying that one out we moved on to using `spawn` and ended 
up settling on `forkserver`. No matter the *start method* we leveraged, we ran 
into the same issue over and over again.
   
   For a while we coped with this behaviour by just restarting the Airflow 
deployment on an hourly basis, but we decided to set some time apart today to 
delve a bit deeper into all this. The good news is after a thorough 
investigation we noticed a pattern the preceded the crash.
   
   In order to pin it down down ran 
[`ps(1)`](https://www.man7.org/linux/man-pages/man1/ps.1.html) on the scheduler 
container. We also monitored the *DAG Processor Manager* log (which we have at 
`/opt/airflow/logs/dag_processor_manager/dag_processor_manager.log` given our 
Airflow home is `/opt/airflow`) and we took a look at the scheduler's log 
through `kubectl logs` given it's sent to *stdout/stderr*. The pattern itself 
goes something like:
   
   1. A `DAGFileProcessor` get's stuck for longer than 
`dag_file_processor_timeout` as seen on `ps`' output.
   2. As soon as the timeout is exceeded, the `DAGFileProcessorManager` kills 
the stuck `DAGFileProcessor`.
   3. When the `DAGFileProcessorManager` tries to collect results back from the 
different `DAGFileProcessor`s it crashes.
   
   The above led us to believe something was a bit off in the way the 
`DAGFileProcessor`s were being killed. Given our Docker-based deployment 
allowed for it, we retrieved a copy of the stock 
[`manager.py`](https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py)
 and 
[`processor.py`](https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py)
 files and added a bit of logging through `self.log.debug()`. The following 
appeared on out `DAGFileProcessorManager` log:
   
   ```
   [2022-03-21 13:01:00,747] {manager.py:1163} DEBUG - FOO - Looking for DAG 
processors to terminate due to timeouts!
   [2022-03-21 13:01:00,748] {manager.py:1172} ERROR - Processor for 
/opt/airflow/dags/dags-airflow/spark_streaming.py with PID 965 started at 
2022-03-21T13:00:10.536124+00:00 has timed out, killing it.
   [2022-03-21 13:01:00,748] {manager.py:1178} DEBUG - FOO - # of waitables 
BEFORE killing timed out processor: 2
   [2022-03-21 13:01:00,748] {manager.py:1180} DEBUG - FOO - # of waitables 
AFTER killing timed out processor: 2
   [2022-03-21 13:01:00,748] {manager.py:1013} DEBUG - 1/1 DAG parsing 
processes running
   [2022-03-21 13:01:00,748] {manager.py:1015} DEBUG - 2 file paths queued for 
processing
   [2022-03-21 13:01:00,758] {manager.py:978} DEBUG - Processor for 
/opt/airflow/dags/dags-airflow/spark_streaming.py finished
   [2022-03-21 13:01:00,758] {manager.py:982} DEBUG - FOO - Trying to get 
result for processor with PID: 965
   ```
   
   Can you see how the number of `waitables` (more on that later) doesn't 
change even though we're killing a `DAGFileProcessor`? We believe that's what's 
causing the trouble...
   
   Note we added the `- FOO -` token to the logging entries we added to easily 
`grep` for them. These entries were generated with calls to `self.log.debug()` 
within the `_kill_timed_out_processors()` function. The 'stock' version is:
   
   
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L1159-L1175
   
   After we added additional logging it looked like:
   
   ```python
   def _kill_timed_out_processors(self):
       """Kill any file processors that timeout to defend against process 
hangs."""
       self.log.debug("FOO - Looking for DAG processors to terminate due to 
timeouts!")
       now = timezone.utcnow()
       processors_to_remove = []
       for file_path, processor in self._processors.items():
           duration = now - processor.start_time
           if duration > self._processor_timeout:
               self.log.error(
                   "Processor for %s with PID %s started at %s has timed out, 
killing it.",
                   file_path,
                   processor.pid,
                   processor.start_time.isoformat(),
               )
               Stats.decr('dag_processing.processes')
               Stats.incr('dag_processing.processor_timeouts')
               # TODO: Remove after Airflow 2.0
               Stats.incr('dag_file_processor_timeouts')
               self.log.debug(f"FOO - # of waitables BEFORE killing timed out 
processor: {len(self.waitables)}")
               processor.kill()
               self.log.debug(f"FOO - # of waitables AFTER killing timed out 
processor: {len(self.waitables)}")
   ```
   
   You can see how we call the `kill()` (which basically wraps the processor's 
`_kill_process()` method) method on the timed out processor. We believe the key 
of all this resides on `line 246` of:
   
   
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L238-L246
   
   Notice how the end of the communicating pipe opened on `line 187` is being 
closed here:
   
   
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L187
   
   That's exactly the same pipe end (i.e. file descriptor) the 
`DAGFileProcessorManager` tries to read from later on! If we look at the 
traceback we included before it's `line 286` which ultimately triggers the 
`OSError`:
   
   
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/processor.py#L286
   
   What it's trying to do is `poll()` a closed pipe. If we take a look at 
`multiprocessing`'s implementation we'll check how, as shown on the traceback, 
it calls `_check_closed()` on the pipe's file descriptor (i.e. handle) before 
proceeding: this, as seen before, triggers the `OSError`.
   
   So... why are we trying to collect results from a `DAGFileProcessor` we 
killed due to a timeout? in order to answer that we took a walk around 
`_run_parsing_loop()`:
   
   
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L612-L734
   
   It basically runs an infinite (unless we specify a maximum number of runs) 
loop that calls 
[`multiprocessing.connection.wait`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.connection.wait)
 based on the contents of `self.waitables`. This attribute is a dictionary 
containing a reference to the different `DAGFileProcessor`'s spawned by the 
`DAGFileProcessorManager`. Entries are added on `line 1034` within 
`start_new_process()`:
   
   
https://github.com/apache/airflow/blob/v2-1-stable/airflow/dag_processing/manager.py#L1015-L1034
   
   However, this dictionary **is not** updated when a processor is killed due 
to a timeout. You can check that out on the snippet we included above. Thus, 
after the timed out `DAGFileProcessor` is killed, the infinite loop on 
`_run_parsing_loop()` will assume the underlying process is ready (it's done, 
as we've effectively terminated it) and will try to read from the pipe end we 
closed on `_kill_process()`, thus triggering the exception and bringing 
everything down. In other words, we believe the `self.waitables` attribute is 
not being updated as it should when `DAGFileProcessor`s are terminated due to 
timeouts. The same is true for the `self._processors` attribute on the 
`DAGFileProcessorManager`. After all, `_kill_timed_out_processors()` iterates 
over its contents... If we don't update it too we'll see how we try to kill an 
already terminated process over and over again.
   
   After some testing we arrived at the following implementation of 
`_kill_timed_out_processors()`:
   
   ```python
   def _kill_timed_out_processors(self):
       """Kill any file processors that timeout to defend against process 
hangs."""
       self.log.debug("FOO - ** Entering _kill_timed_out_processors() **")
   
       # We'll get a clear picture of what these two attributes look like before
           # killing anything.
       self.log.debug(f"FOO - We'll iterate over: {self._processors}")
       self.log.debug(f"FOO - Current waitables dir: {self.waitables}")
       now = timezone.utcnow()
       processors_to_remove = []
       for file_path, processor in self._processors.items():
           duration = now - processor.start_time
           if duration > self._processor_timeout:
               self.log.error(
                   "Processor for %s with PID %s started at %s has timed out, 
killing it.",
                   file_path,
                   processor.pid,
                   processor.start_time.isoformat(),
               )
               Stats.decr('dag_processing.processes')
               Stats.incr('dag_processing.processor_timeouts')
               # TODO: Remove after Airflow 2.0
               Stats.incr('dag_file_processor_timeouts')
   
               # Add some logging to check stuff out
               self.log.debug(f"FOO - # of waitables BEFORE killing timed out 
processor: {len(self.waitables)}")
               self.log.debug(f"FOO - We'll iterate over: {self._processors}")
               self.log.debug(f"FOO - Current waitables dir: {self.waitables}")
   
               # Kill the hanged processor
               processor.kill()
   
               # Update self.waitables to avoid asking for results later on
               self.waitables.pop(processor.waitable_handle)
   
               # Make a note of the file_paths we are to remove later on: we 
feel a bit uneasy about modifying the
                   # container we're currently iterating over...
               processors_to_remove.append(file_path)
   
               # Add some logging to check how stuff is doing...
               self.log.debug(f"FOO - # of waitables AFTER killing timed out 
processor: {len(self.waitables)}")
               self.log.debug(f"FOO - We'll iterate over: {self._processors}")
               self.log.debug(f"FOO - Current waitables dir: {self.waitables}")
   
       # Clean up `self._processors` too!
       for proc in processors_to_remove:
           self._processors.pop(proc)
   
       # And after we're done take a look at the final state
       self.log.debug(f"FOO - Processors after cleanup: {self._processors}")
       self.log.debug(f"FOO - Waitables after cleanup:  {self.waitables}")
       self.log.debug(f"FOO - ** Leaving _kill_timed_out_processors() with **")
   ```
   
   We know the above can surely be written in a more succinct/better way: we're 
by no means good programmers!
   
   Against all odds, the code above seems to prevent the crash! :tada: It does, 
however, spawn zombies when we kill the `DAGFileProcessor`: it's not being 
`wait()`ed for...
   
   We decided to also play around with the `DAGFileProcessor`'s 
`_kill_process()` method a bit in the name of science to try and prevent that 
zombie from spawning:
   
   ```python
   def _kill_process(self) -> None:
       if self._process is None:
           raise AirflowException("Tried to kill process before starting!")
   
       if self._process.is_alive() and self._process.pid:
           self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", 
self._process.pid)
           os.kill(self._process.pid, signal.SIGKILL)
   
           # Reap the resulting zombie! Note the call to `waitpid()` blocks 
unless we
               # leverage the `WNOHANG` 
(https://docs.python.org/3/library/os.html#os.WNOHANG)
               # option. Given we were just playing around we decided not to 
bother with that...
           self.warning(f"FOO - Waiting to reap the zombie spawned from PID 
{self._process.pid}")
           os.waitpid(self._process.pid)
           self.warning(f"FOO - Reaped the zombie spawned from PID 
{self._process.pid}")
       if self._parent_channel:
           self._parent_channel.close()
   ```
   
   From what we could see, the above reaped the zombie like we initially 
expected it to.
   
   So, after all this nonsense we just wanted to end up by saying that we 
believe it's the way `DAGFileManagerProcessor`'s attributes are being cleaned 
up that crashes Airflow for us. In our experience this is triggered by a 
`DAGFileProcessor` being forcefully terminated after a timeout.
   
   We would also like to thank everybody making Airflow possible: it's one heck 
of a tool!
   
   Feel free to ask for more details and, if we got anything wrong (it wouldn't 
be the first time), please do let us know!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to