Taragolis commented on issue #35474:
URL: https://github.com/apache/airflow/issues/35474#issuecomment-1801592051
I think yes we need an additional escalation level for execution timeout.
The problem of current implementation that we raise an error in handler
function and have no idea in which place in Main thread it actually raised, as
result user code, third-party libraries and airflow components itself might
prevent exit from the task.
In general in most cases it works fine, even according to description of the
issue "Once every ~500-1000 runs approximately, the task hangs up infinitely
until manually killed"
Just for demonstrate current flaky behaviour with a bit greater probability
to survive after TimeoutError
```python
import signal
import time
class TimeoutError(Exception):
...
def flacky_behaviour():
ix = 0
delay = 0.005
each_second = 1 / delay
while True:
# Handle all regular exceptions here
try:
# SIGALRM Handler raise error here? Then code survive
# change TimeoutError base class to BaseException might help in
this case
time.sleep(delay)
except Exception:
print("Nope")
# Handle all exceptions include ``SystemExit`` (sys.exit) and
``KeyboardInterrupt`` (SIGINT)
try:
# SIGALRM Handler raise error here?
# Well... better luck next time
time.sleep(delay)
except:
print("Serious NOPE!")
# If error happen outside of try block then execution would terminate
time.sleep(delay * 2)
if ix % each_second == 0:
print("It's alive!")
ix += 1
def handler(signum, frame):
print(f"Raise TimeoutError, MRO {TimeoutError.mro()}")
raise TimeoutError("Timeout!")
TIMEOUT = 2
signal.signal(signal.SIGALRM, handler)
signal.alarm(TIMEOUT)
try:
flacky_behaviour()
finally:
signal.alarm(0)
```
There is couple of different solution (an combination) in my head which
might make things better (or worse).
Maybe better move this discussion into Dev List for wider group of people
Option 1
---
https://github.com/apache/airflow/blob/de92a81f002e6c1b3e74ad9d074438b65acb87b6/airflow/exceptions.py#L81-L82
Inherit `AirflowTaskTimeout` from `BaseException` rather than
`AirflowException`, it should help in case of
`try ... except Exception:`, It doesn't help in case if upstream code use
`try ... except:` but such code is a Worst Practice in python anyway.
Ideally we could think about changing inheritance of AirflowException from
`Exception` to `BaseException` in Airflow 3, and make most of exceptions
internal only and make only couple of then as part of public interface
Option 2
---
In additional to exception write TIMEOUT state into the DB backend, so
upstream processes could kill hung process
In some circumstances would breaks `silent_fail` of BaseSensorOperator and
might be `on_kill` methods
Option 3
---
Introduce new task state TIMEOUT and control execution timeout in Scheduler.
Well this one require bigger effort to implements (JobRunners, new trigger
rules) and think have it also have some side effects include the fact that some
contributors/PMC would be against of new state since it add complexability to
the already complex system
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]