AMBARI-19392. Status command executor may use obsolete settings. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b5d3e072 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b5d3e072 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b5d3e072 Branch: refs/heads/branch-2.5 Commit: b5d3e072fe44fb3ceb124b769310d8bb75e9c88a Parents: d46461a Author: Toader, Sebastian <stoa...@hortonworks.com> Authored: Fri Jan 6 23:45:31 2017 +0100 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Wed Jan 18 09:41:55 2017 +0100 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 14 +++++++++++--- .../src/main/python/ambari_agent/Controller.py | 7 ++++++- 2 files changed, 17 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/b5d3e072/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index aeae954..3726286 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -98,9 +98,17 @@ class ActionQueue(threading.Thread): return self._stop.isSet() def put_status(self, commands): - #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones - while not self.statusCommandQueue.empty(): - self.statusCommandQueue.get() + if not self.statusCommandQueue.empty(): + #Clear all status commands. Was supposed that we got all set of statuses, we don't need to keep old ones + statusCommandQueueSize = 0 + try: + while not self.statusCommandQueue.empty(): + self.statusCommandQueue.get(False) + statusCommandQueueSize = statusCommandQueueSize + 1 + except Queue.Empty: + pass + + logger.info("Number of status commands removed from queue : " + str(statusCommandQueueSize)) for command in commands: logger.info("Adding " + command['commandType'] + " for component " + \ http://git-wip-us.apache.org/repos/asf/ambari/blob/b5d3e072/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index f6296d8..11b98f4 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -198,6 +198,12 @@ class Controller(threading.Thread): self.config.update_configuration_from_registration(ret) logger.debug("Updated config:" + str(self.config)) + if self.statusCommandsExecutor is None: + self.spawnStatusCommandsExecutorProcess() + elif self.statusCommandsExecutor.is_alive(): + logger.info("Terminating statusCommandsExecutor as agent re-registered with server.") + self.statusCommandsExecutor.kill() + if 'statusCommands' in ret.keys(): logger.debug("Got status commands on registration.") self.addToStatusQueue(ret['statusCommands']) @@ -461,7 +467,6 @@ class Controller(threading.Thread): try: self.actionQueue = ActionQueue(self.config, controller=self) self.actionQueue.start() - self.spawnStatusCommandsExecutorProcess() self.register = Register(self.config) self.heartbeat = Heartbeat(self.actionQueue, self.config, self.alert_scheduler_handler.collector())