AMBARI-20323. Commands timed-out on ambari host without any error logs - 
addendum patch (echekanskiy)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b69ac43a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b69ac43a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b69ac43a

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: b69ac43a6a7aa5c7810bca0bc1204e6641634c35
Parents: 0471b0c
Author: Eugene Chekanskiy <echekans...@hortonworks.com>
Authored: Sat Mar 11 00:08:27 2017 +0200
Committer: Eugene Chekanskiy <echekans...@hortonworks.com>
Committed: Sat Mar 11 00:08:27 2017 +0200

----------------------------------------------------------------------
 .../src/main/python/ambari_agent/Controller.py  |  2 +-
 .../src/main/python/ambari_agent/ExitHelper.py  |  3 ++
 .../ambari_agent/StatusCommandsExecutor.py      | 36 ++++++++++++++++----
 .../src/main/python/ambari_agent/main.py        |  4 +--
 4 files changed, 35 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/Controller.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py 
b/ambari-agent/src/main/python/ambari_agent/Controller.py
index c152f64..c1a5f1b 100644
--- a/ambari-agent/src/main/python/ambari_agent/Controller.py
+++ b/ambari-agent/src/main/python/ambari_agent/Controller.py
@@ -477,7 +477,7 @@ class Controller(threading.Thread):
     try:
       self.actionQueue = ActionQueue(self.config, controller=self)
       self.statusCommandsExecutor = StatusCommandsExecutor(self.config, 
self.actionQueue)
-      ExitHelper().register(self.statusCommandsExecutor.kill, 
"CLEANUP_KILLING")
+      ExitHelper().register(self.statusCommandsExecutor.kill, 
"CLEANUP_KILLING", can_relaunch=False)
       self.actionQueue.start()
       self.register = Register(self.config)
       self.heartbeat = Heartbeat(self.actionQueue, self.config, 
self.alert_scheduler_handler.collector())

http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py 
b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
index e51646f..66e29e6 100644
--- a/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
+++ b/ambari-agent/src/main/python/ambari_agent/ExitHelper.py
@@ -39,6 +39,9 @@ class ExitHelper(object):
   """
   Class to cleanup resources before exiting. Replacement for atexit module. 
sys.exit(code) works only from threads and
   os._exit(code) will ignore atexit and cleanup will be ignored.
+
+  WARNING: always import as `ambari_agent.ExitHelper import ExitHelper`, 
otherwise it will be imported twice and nothing
+  will work as expected.
   """
   __metaclass__ = _singleton
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 3f7ef4c..5c1c54a 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -37,6 +37,9 @@ class StatusCommandsExecutor(object):
     self.config = config
     self.actionQueue = actionQueue
 
+    self._can_relaunch_lock = threading.RLock()
+    self._can_relaunch = True
+
     # used to prevent queues from been used during creation of new one to 
prevent threads messing up with combination of
     # old and new queues
     self.usage_lock = threading.RLock()
@@ -53,6 +56,16 @@ class StatusCommandsExecutor(object):
     self.mp_result_logs = multiprocessing.Queue()
     self.mp_task_queue = multiprocessing.Queue()
 
+  @property
+  def can_relaunch(self):
+    with self._can_relaunch_lock:
+      return self._can_relaunch
+
+  @can_relaunch.setter
+  def can_relaunch(self, value):
+    with self._can_relaunch_lock:
+      self._can_relaunch = value
+
   def _log_message(self, level, message, exception=None):
     """
     Put log message to logging queue. Must be used only for logging from child 
process(in _worker_process_target).
@@ -163,7 +176,7 @@ class StatusCommandsExecutor(object):
       self._log_message(logging.ERROR, "StatusCommandsExecutor process failed 
with exception:", e)
       raise
 
-    self._log_message(logging.WARN, "StatusCommandsExecutor subprocess 
finished")
+    self._log_message(logging.INFO, "StatusCommandsExecutor subprocess 
finished")
 
   def _set_timed_out(self, command):
     """
@@ -242,23 +255,32 @@ class StatusCommandsExecutor(object):
     :param reason: reason of restart
     :return:
     """
-    self.kill(reason)
-    self.worker_process = 
multiprocessing.Process(target=self._worker_process_target)
-    self.worker_process.start()
-    logger.info("Started process with pid {0}".format(self.worker_process.pid))
+    if self.can_relaunch:
+      self.kill(reason)
+      self.worker_process = 
multiprocessing.Process(target=self._worker_process_target)
+      self.worker_process.start()
+      logger.info("Started process with pid 
{0}".format(self.worker_process.pid))
+    else:
+      logger.debug("Relaunch does not allowed, can not relaunch")
 
-  def kill(self, reason=None):
+  def kill(self, reason=None, can_relaunch=True):
     """
     Tries to stop command executor internal process for sort time, otherwise 
killing it. Closing all possible queues to
     unblock threads that probably blocked on read or write operations to 
queues. Must be called from threads different
     from threads that calling read or write methods(get_log_messages, 
get_results, put_commands).
 
+    :param can_relaunch: indicates if StatusCommandsExecutor can be relaunched 
after this kill
     :param reason: reason of killing
     :return:
     """
+    logger.info("Killing child process reason:" + str(reason))
+    self.can_relaunch = can_relaunch
+
+    if not self.can_relaunch:
+      logger.info("Killing without possibility to relaunch...")
+
     # try graceful stop, otherwise hard-kill
     if self.worker_process and self.worker_process.is_alive():
-      logger.info("Killing child process reason:" + str(reason))
       self.mustDieEvent.set()
       self.worker_process.join(timeout=3)
       if self.worker_process.is_alive():

http://git-wip-us.apache.org/repos/asf/ambari/blob/b69ac43a/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index ade9e4f..ddef473 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -89,7 +89,7 @@ from NetUtil import NetUtil
 from PingPortListener import PingPortListener
 import hostname
 from DataCleaner import DataCleaner
-from ExitHelper import ExitHelper
+from ambari_agent.ExitHelper import ExitHelper
 import socket
 from ambari_commons import OSConst, OSCheck
 from ambari_commons.shell import shellRunner
@@ -336,7 +336,7 @@ def run_threads(server_hostname, heartbeat_stop_callback):
     if controller.get_status_commands_executor().need_relaunch:
       
controller.get_status_commands_executor().relaunch("COMMAND_TIMEOUT_OR_KILLED")
 
-  controller.get_status_commands_executor().kill()
+  controller.get_status_commands_executor().kill("AGENT_STOPPED", 
can_relaunch=False)
 
 # event - event, that will be passed to Controller and NetUtil to make able to 
interrupt loops form outside process
 # we need this for windows os, where no sigterm available

Reply via email to