AMBARI-19520. Ambari agents not recovering from heart beat lost state immediately after successful re-registering with server. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b512b26a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b512b26a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b512b26a Branch: refs/heads/branch-2.5 Commit: b512b26ae48c92df0b8d884c08f5f07cf9a2875b Parents: 36f7422 Author: Toader, Sebastian <stoa...@hortonworks.com> Authored: Mon Jan 16 13:43:01 2017 +0100 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Wed Jan 18 09:41:56 2017 +0100 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/Controller.py | 63 ++++++++++++++------ .../src/main/python/ambari_agent/main.py | 18 +++--- 2 files changed, 53 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b512b26a/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index 2244d30..09ab1e6 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -86,6 +86,10 @@ class Controller(threading.Thread): self.max_reconnect_retry_delay = int(config.get('server','max_reconnect_retry_delay', default=30)) self.hasMappedComponents = True self.statusCommandsExecutor = None + + # this lock is used control which thread spawns/kills the StatusCommandExecutor child process + self.spawnKillStatusCommandExecutorLock = threading.RLock() + # Event is used for synchronizing heartbeat iterations (to make possible # manual wait() interruption between heartbeats ) self.heartbeat_stop_callback = heartbeat_stop_callback @@ -199,11 +203,9 @@ class Controller(threading.Thread): self.config.update_configuration_from_registration(ret) logger.debug("Updated config:" + str(self.config)) - if self.statusCommandsExecutor is None: - self.spawnStatusCommandsExecutorProcess() - elif self.statusCommandsExecutor.is_alive(): - logger.info("Terminating statusCommandsExecutor as agent re-registered with server.") - self.killStatusCommandsExecutorProcess() + # Start StatusCommandExecutor child process or restart it if already running + # in order to receive up to date agent config. + self.spawnStatusCommandsExecutorProcess() if 'statusCommands' in ret.keys(): logger.debug("Got status commands on registration.") @@ -458,22 +460,43 @@ class Controller(threading.Thread): self.DEBUG_STOP_HEARTBEATING=True def spawnStatusCommandsExecutorProcess(self): - # Re-create the status command queue as in case the consumer - # process is killed the queue may deadlock (see http://bugs.python.org/issue20527). - # The queue must be re-created by the producer process. - if self.actionQueue.statusCommandQueue is not None: - self.actionQueue.statusCommandQueue.close() - self.actionQueue.statusCommandQueue.join_thread() - - self.actionQueue.statusCommandQueue = multiprocessing.Queue() - - self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) - self.statusCommandsExecutor.start() + ''' + Starts a new StatusCommandExecutor child process. In case there is a running instance + already restarts it by simply killing it and starting new one. + This function is thread-safe. + ''' + with self.getSpawnKillStatusCommandExecutorLock(): + # if there is already an instance of StatusCommandExecutor kill it first + self.killStatusCommandsExecutorProcess() + + # Re-create the status command queue as in case the consumer + # process is killed the queue may deadlock (see http://bugs.python.org/issue20527). + # The queue must be re-created by the producer process. + statusCommandQueue = self.actionQueue.statusCommandQueue + self.actionQueue.statusCommandQueue = multiprocessing.Queue() + + if statusCommandQueue is not None: + statusCommandQueue.close() + + logger.info("Spawning statusCommandsExecutor") + self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) + self.statusCommandsExecutor.start() def killStatusCommandsExecutorProcess(self): - self.statusCommandsExecutor.kill() - - + ''' + Kills the StatusExecutorChild process if exists. This function is thread-safe. + ''' + with self.getSpawnKillStatusCommandExecutorLock(): + if self.statusCommandsExecutor is not None and self.statusCommandsExecutor.is_alive(): + logger.info("Terminating statusCommandsExecutor.") + self.statusCommandsExecutor.kill() + + def getSpawnKillStatusCommandExecutorLock(self): + ''' + Re-entrant lock to be used to synchronize the spawning or killing of + StatusCommandExecutor child process in multi-thread environment. + ''' + return self.spawnKillStatusCommandExecutorLock; def getStatusCommandsExecutor(self): return self.statusCommandsExecutor @@ -586,6 +609,8 @@ class Controller(threading.Thread): except Exception, e: logger.info("Exception in move_data_dir_mount_file(). Error: {0}".format(str(e))) + + def main(argv=None): # Allow Ctrl-C http://git-wip-us.apache.org/repos/asf/ambari/blob/b512b26a/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 2e1124e..8e577a5 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -291,15 +291,15 @@ def run_threads(server_hostname, heartbeat_stop_callback): while controller.is_alive(): time.sleep(0.1) - if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()): - if controller.getStatusCommandsExecutor().is_alive(): - logger.info("Terminating statusCommandsExecutor") - controller.killStatusCommandsExecutorProcess() - logger.info("Respawning statusCommandsExecutor") - controller.spawnStatusCommandsExecutorProcess() - - if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive(): - controller.killStatusCommandsExecutorProcess() + with controller.getSpawnKillStatusCommandExecutorLock(): + # We need to lock as Controller.py may try to spawn StatusCommandExecutor child in parallel as well + if controller.getStatusCommandsExecutor() is not None \ + and (not controller.getStatusCommandsExecutor().is_alive() + or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()): + controller.spawnStatusCommandsExecutorProcess() + + + controller.killStatusCommandsExecutorProcess() # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available