[ https://issues.apache.org/jira/browse/AIRFLOW-4401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16826749#comment-16826749 ]
Jarek Potiuk commented on AIRFLOW-4401: --------------------------------------- I saw at least one case that this fix could have an unforeseen side effect. I created AIRFLOW-4416 Jira issue for that and created a PR to revert it for now from master and I while I think the root cause is correct, we need more time to analyse and better fix. [~ash] - I believe you have not yet merged that to v1-10-test/stable branches. I think it's safer not to include it now until this is solved and tested more thoroughly. > multiprocessing.Queue.empty() is unreliable > ------------------------------------------- > > Key: AIRFLOW-4401 > URL: https://issues.apache.org/jira/browse/AIRFLOW-4401 > Project: Apache Airflow > Issue Type: Bug > Reporter: Jarek Potiuk > Priority: Major > Fix For: 1.10.4 > > > After discussing with [~ash] and [~BasPH] potential reasons for flakiness of > LocalExecutor tests, I took a deeper dive into the problem and what I found > raised the remaining hair on top of my head. > We had a number of flaky tests in the local executor that resulted in > result_queue not being empty where it should have been emptied a moment > before. More details and discussion can be found in > [https://github.com/apache/airflow/pull/5159] > The problem turned out to be ... unreliability of multiprocessing.Queue > empty() implementation. It turned out that multiprocessing.Queue.empty() > implementation is not fully synchronized and it might return True even if > put() operation has been already completed in another process. What's more - > empty() might return True even if qsize() of the queue returns > 0 (!) > It's a bit mind-boggling but it is "as intended' as documented in > [https://bugs.python.org/issue23582] (resolved as "not a bug" !!!!) > A few people have stumbled upon this problem. For example > [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f] > and [https://github.com/keras-team/autokeras/issues/368] > Also we seemed to experienced that in Airflow before. In jobs.py years ago > (31.07.2016) - we can see the comment below (but we used > multiprocessing.queue empty() nevertheless): > {code:java} > # Not using multiprocessing.Queue() since it's no longer a separate > # process and due to some unusual behavior. (empty() incorrectly > # returns true?){code} > The solution available in [https://bugs.python.org/issue23582] using qsize() > was working on Linux but is not really acceptable because qsize() does not > work on MacOS (throws NotImplementedError). > The working solution is to implement a reliable queue (SynchronizedQueue) > based on > [https://github.com/vterron/lemon/commit/9ca6b4b1212228dbd4f69b88aaf88b12952d7d6f] > (butwith a twist that __init__ of class deriving from Queue has to be > changed for python 3.4+ as described in > [https://stackoverflow.com/questions/24941359/ctx-parameter-in-multiprocessing-queue]. > Luckily we are now Python3.5+ > We should replace all usages of multiprocessing.Queue where empty() is used > with the SynchronizedQueue. And make sure we do not use multiprocessing.Queue > in similar way in the future. > > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)