potiuk edited a comment on pull request #19860:
URL: https://github.com/apache/airflow/pull/19860#issuecomment-982481602
OK. I have very interesting findings and I might need some brainstorming.
@ashb @ephraimbuddy @uranusjr.
I think know what is causing the problem wut but I have not figured out the
mechanism (yet). I am running some experiments to get more data but the problem
with `test_scheduler_keeps_scheduling_pool_full` is most likely caused by side
effect of
`test_scheduler_multiprocessing_with_spawn_method` - and I am afraid it
could have some real consequences on systems that are using `spawn` method for
multiprocessing.
This seems to be caused by a deffered re-importing of `airflow.settings`
generated by DagProcessor from the `spawn` test. However I scratch my head and
cannot figure out how such reload in a - seamingly - different process, with
python interpreter started from scratch (this is how spawn works) could
influence the tests running in a different process. It might be related to some
shared memory or resources on the "driver" level for the database (this is the
only thing left as a hypothesis)
Some Observations:
This problem is accompanied always by this log entry (which is generated by
reloading settings by dag processor I believe):
```
[2021-11-29 19:14:45,094] {settings.py:52} INFO - Configured default
timezone Timezone('UTC')
```
This entry asynchronously appears at various places - sometimes during the
test, sometimes in `setup` (the latter is a proof that this is a side effect
from an earlier test and not generated by the test itself). Depending on when
the "reload" happens, the effects on the test might be slighly different.
* First of all the `test_scheduler_keeps_scheduling_pool_full` runs right
after `test_scheduler_multiprocessing_with_spawn_method` because that's the
sequence in the test.
* I tested it by disalbling the
`test_scheduler_multiprocessing_with_spawn_method` test. When the test is
enabled, the problem happens rather consistently (1/2 failures for each CI
builds). When the test is disabled, the
`test_scheduler_keeps_scheduling_pool_full` consistently works fine
* I am (just now) running an experiment where i moved the `spawn` test case
after the `test_scheduler_keeps_scheduling_pool_full` and I will see what
effect it will have (I guess the following tests might start failing).
I see three interesting cases:
Reload happening after or around second DAG syncing - here only d2 was
missing:
```
---------------------------- Captured stdout setup
-----------------------------
[2021-11-29 19:14:44,996] {dagbag.py:500} INFO - Filling up the DagBag
from /dev/null
------------------------------ Captured log setup
------------------------------
INFO airflow.models.dagbag.DagBag:dagbag.py:500 Filling up the DagBag
from /dev/null
----------------------------- Captured stdout call
-----------------------------
[2021-11-29 19:14:45,011] {dag.py:2395} INFO - Sync 1 DAGs
[2021-11-29 19:14:45,015] {dag.py:2414} INFO - Creating ORM DAG for
test_scheduler_keeps_scheduling_pool_full_d1
[2021-11-29 19:14:45,021] {dag.py:2934} INFO - Setting next_dagrun for
test_scheduler_keeps_scheduling_pool_full_d1 to 2016-01-01T00:00:00+00:00
[2021-11-29 19:14:45,041] {dag.py:2395} INFO - Sync 1 DAGs
[2021-11-29 19:14:45,044] {dag.py:2414} INFO - Creating ORM DAG for
test_scheduler_keeps_scheduling_pool_full_d2
[2021-11-29 19:14:45,050] {dag.py:2934} INFO - Setting next_dagrun for
test_scheduler_keeps_scheduling_pool_full_d2 to 2016-01-01T00:00:00+00:00
[2021-11-29 19:14:45,094] {settings.py:52} INFO - Configured default
timezone Timezone('UTC') <--- HERE RELOAD HAPPENED
[2021-11-29 19:14:45,190] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-29 19:14:45,198] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-29 19:14:45,207] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-29 19:14:45,221] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-29 19:14:45,235] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-29 19:14:45,248] {scheduler_job.py:287} INFO - 2 tasks up for
execution:
```
Relead happening before DAG syncing (here both dags are missing).
```
---------------------------- Captured stdout setup
-----------------------------
[2021-11-28 17:37:58,370] {settings.py:52} INFO - Configured default
timezone Timezone('UTC') <-- HERE RELOAD HAPPENED
[2021-11-28 17:37:58,372] {dagbag.py:500} INFO - Filling up the DagBag
from /dev/null
------------------------------ Captured log setup
------------------------------
INFO airflow.models.dagbag.DagBag:dagbag.py:500 Filling up the DagBag
from /dev/null
----------------------------- Captured stdout call
-----------------------------
[2021-11-28 17:37:58,381] {dag.py:2395} INFO - Sync 1 DAGs
[2021-11-28 17:37:58,384] {dag.py:2414} INFO - Creating ORM DAG for
test_scheduler_keeps_scheduling_pool_full_d1
[2021-11-28 17:37:58,388] {dag.py:2928} INFO - Setting next_dagrun for
test_scheduler_keeps_scheduling_pool_full_d1 to 2016-01-01T00:00:00+00:00
[2021-11-28 17:37:58,403] {dag.py:2395} INFO - Sync 1 DAGs
[2021-11-28 17:37:58,406] {dag.py:2414} INFO - Creating ORM DAG for
test_scheduler_keeps_scheduling_pool_full_d2
[2021-11-28 17:37:58,410] {dag.py:2928} INFO - Setting next_dagrun for
test_scheduler_keeps_scheduling_pool_full_d2 to 2016-01-01T00:00:00+00:00
[2021-11-28 17:37:58,429] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
[2021-11-28 17:37:58,438] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
[2021-11-28 17:37:58,448] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
[2021-11-28 17:37:58,461] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
[2021-11-28 17:37:58,474] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d1 in DagBag/DB!
[2021-11-28 17:37:58,487] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-28 17:37:58,501] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-28 17:37:58,514] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-28 17:37:58,527] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-28 17:37:58,539] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
```
Reload happening between the dags syncing (here again d2 was missing):
```
---------------------------- Captured stdout setup
-----------------------------
[2021-11-28 13:41:46,975] {dagbag.py:500} INFO - Filling up the DagBag
from /dev/null
------------------------------ Captured log setup
------------------------------
INFO airflow.models.dagbag.DagBag:dagbag.py:500 Filling up the DagBag
from /dev/null
----------------------------- Captured stdout call
-----------------------------
[2021-11-28 13:41:46,993] {dag.py:2395} INFO - Sync 1 DAGs
[2021-11-28 13:41:46,998] {dag.py:2414} INFO - Creating ORM DAG for
test_scheduler_keeps_scheduling_pool_full_d1
[2021-11-28 13:41:47,004] {dag.py:2928} INFO - Setting next_dagrun for
test_scheduler_keeps_scheduling_pool_full_d1 to 2016-01-01T00:00:00+00:00
[2021-11-28 13:41:47,011] {settings.py:52} INFO - Configured default
timezone Timezone('UTC') <-- HERE RELOAD HAPPENS
[2021-11-28 13:41:47,030] {dag.py:2395} INFO - Sync 1 DAGs
[2021-11-28 13:41:47,035] {dag.py:2414} INFO - Creating ORM DAG for
test_scheduler_keeps_scheduling_pool_full_d2
[2021-11-28 13:41:47,042] {dag.py:2928} INFO - Setting next_dagrun for
test_scheduler_keeps_scheduling_pool_full_d2 to 2016-01-01T00:00:00+00:00
[2021-11-28 13:41:47,198] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-28 13:41:47,209] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-28 13:41:47,220] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-28 13:41:47,231] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
[2021-11-28 13:41:47,241] {scheduler_job.py:992} ERROR - Couldn't find dag
test_scheduler_keeps_scheduling_pool_full_d2 in DagBag/DB!
```
It looks like it is not connected with exactly the moment of the log, but I
believe it is caused with what happens next after the "Timezone" log is logged.
When I look at the code, right after re-importing the settings, DagProcessor
calls `initialize()` method there are plenty of possibilities where some fo
those (`configure_orm()` ?) might cause disruption to SQLAlchemy models stored
in session.
```
def initialize():
"""Initialize Airflow with all the settings from this file"""
configure_vars()
prepare_syspath()
import_local_settings()
global LOGGING_CLASS_PATH
LOGGING_CLASS_PATH = configure_logging()
configure_adapters()
# The webservers import this file from models.py with the default
settings.
configure_orm()
configure_action_logging()
# Ensure we close DB connections at scheduler and gunicorn worker
terminations
atexit.register(dispose_orm)
```
However I failed to come up with a theory why such reload in a spawned
process might cause this really? And why the DagProcessor is not stopped by
this in pytest fixture ?
```
if self.scheduler_job and self.scheduler_job.processor_agent:
self.scheduler_job.processor_agent.end()
self.scheduler_job = None
```
Any help and brainstorming appreciated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]