AMBARI-20070. Agent heartbeat lost due to subprocess.Popen race condition -- 
addendum for multiprocessing


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/19c9e255
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/19c9e255
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/19c9e255

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 19c9e25587316bceb46c1585fce712fdf5056e39
Parents: 9446252
Author: Attila Doroszlai <adorosz...@hortonworks.com>
Authored: Sat Mar 11 19:13:16 2017 +0100
Committer: Attila Doroszlai <adorosz...@hortonworks.com>
Committed: Mon Mar 13 22:21:59 2017 +0100

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/main.py        | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/19c9e255/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index ddef473..8e55916 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -45,23 +45,32 @@ def fix_subprocess_racecondition():
 
 def fix_subprocess_popen():
   '''
-  http://bugs.python.org/issue19809
+  Workaround for race condition in starting subprocesses concurrently from
+  multiple threads via the subprocess and multiprocessing modules.
+  See http://bugs.python.org/issue19809 for details and repro script.
   '''
   import os
   import sys
 
   if os.name == 'posix' and sys.version_info[0] < 3:
+    from multiprocessing import forking
     import subprocess
     import threading
 
-    original_init = subprocess.Popen.__init__
-    lock = threading.RLock()
+    sp_original_init = subprocess.Popen.__init__
+    mp_original_init = forking.Popen.__init__
+    lock = threading.RLock() # guards subprocess creation
 
-    def locked_init(self, *a, **kw):
+    def sp_locked_init(self, *a, **kw):
       with lock:
-        original_init(self, *a, **kw)
+        sp_original_init(self, *a, **kw)
 
-    subprocess.Popen.__init__ = locked_init
+    def mp_locked_init(self, *a, **kw):
+      with lock:
+        mp_original_init(self, *a, **kw)
+
+    subprocess.Popen.__init__ = sp_locked_init
+    forking.Popen.__init__ = mp_locked_init
 
 
 fix_subprocess_popen()

Reply via email to