Repository: ambari Updated Branches: refs/heads/trunk 8fa580614 -> 1535bc982
AMBARI-19416. Ambari agents remain in heartbeat lost state after ambari server restart. (stoader) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/1535bc98 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/1535bc98 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/1535bc98 Branch: refs/heads/trunk Commit: 1535bc9823d7a9e58d1a3f195949a8c6d43b91d6 Parents: 8fa5806 Author: Toader, Sebastian <stoa...@hortonworks.com> Authored: Mon Jan 9 13:18:53 2017 +0100 Committer: Toader, Sebastian <stoa...@hortonworks.com> Committed: Mon Jan 9 13:18:53 2017 +0100 ---------------------------------------------------------------------- .../src/main/python/ambari_agent/ActionQueue.py | 5 ++++- .../src/main/python/ambari_agent/Controller.py | 17 ++++++++++++++++- ambari-agent/src/main/python/ambari_agent/main.py | 5 +++-- .../src/test/python/ambari_agent/TestHeartbeat.py | 2 ++ .../src/test/python/ambari_agent/TestMain.py | 4 +++- 5 files changed, 28 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/main/python/ambari_agent/ActionQueue.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py index 3726286..18d7c2a 100644 --- a/ambari-agent/src/main/python/ambari_agent/ActionQueue.py +++ b/ambari-agent/src/main/python/ambari_agent/ActionQueue.py @@ -75,7 +75,10 @@ class ActionQueue(threading.Thread): def __init__(self, config, controller): super(ActionQueue, self).__init__() self.commandQueue = Queue.Queue() - self.statusCommandQueue = multiprocessing.Queue() + self.statusCommandQueue = None # the queue this field points to is re-created whenever + # a new StatusCommandExecutor child process is spawned + # by Controller + # multiprocessing.Queue() self.statusCommandResultQueue = multiprocessing.Queue() # this queue is filled by StatuCommandsExecutor. self.backgroundCommandQueue = Queue.Queue() self.commandStatuses = CommandStatusDict(callback_action = http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/main/python/ambari_agent/Controller.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Controller.py b/ambari-agent/src/main/python/ambari_agent/Controller.py index f6bda1e..6b1b196 100644 --- a/ambari-agent/src/main/python/ambari_agent/Controller.py +++ b/ambari-agent/src/main/python/ambari_agent/Controller.py @@ -18,6 +18,7 @@ See the License for the specific language governing permissions and limitations under the License. ''' +import multiprocessing import logging import ambari_simplejson as json import sys @@ -202,7 +203,7 @@ class Controller(threading.Thread): self.spawnStatusCommandsExecutorProcess() elif self.statusCommandsExecutor.is_alive(): logger.info("Terminating statusCommandsExecutor as agent re-registered with server.") - self.statusCommandsExecutor.kill() + self.killStatusCommandsExecutorProcess() if 'statusCommands' in ret.keys(): logger.debug("Got status commands on registration.") @@ -457,9 +458,23 @@ class Controller(threading.Thread): self.DEBUG_STOP_HEARTBEATING=True def spawnStatusCommandsExecutorProcess(self): + # Re-create the status command queue as in case the consumer + # process is killed the queue may deadlock (see http://bugs.python.org/issue20527). + # The queue must be re-created by the producer process. + if self.actionQueue.statusCommandQueue is not None: + self.actionQueue.statusCommandQueue.close() + self.actionQueue.statusCommandQueue.join_thread() + + self.actionQueue.statusCommandQueue = multiprocessing.Queue() + self.statusCommandsExecutor = StatusCommandsExecutor(self.config, self.actionQueue) self.statusCommandsExecutor.start() + def killStatusCommandsExecutorProcess(self): + self.statusCommandsExecutor.kill() + + + def getStatusCommandsExecutor(self): return self.statusCommandsExecutor http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index f812226..2e0517b 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -312,11 +312,12 @@ def run_threads(server_hostname, heartbeat_stop_callback): if controller.getStatusCommandsExecutor() is not None and (not controller.getStatusCommandsExecutor().is_alive() or controller.getStatusCommandsExecutor().hasTimeoutedEvent.is_set()): if controller.getStatusCommandsExecutor().is_alive(): logger.info("Terminating statusCommandsExecutor") - controller.getStatusCommandsExecutor().kill() + controller.killStatusCommandsExecutorProcess() logger.info("Respawning statusCommandsExecutor") controller.spawnStatusCommandsExecutorProcess() - controller.getStatusCommandsExecutor().kill() + if controller.getStatusCommandsExecutor() is not None and controller.getStatusCommandsExecutor().is_alive(): + controller.killStatusCommandsExecutorProcess() # event - event, that will be passed to Controller and NetUtil to make able to interrupt loops form outside process # we need this for windows os, where no sigterm available http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py index 19fad56..de07743 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py +++ b/ambari-agent/src/test/python/ambari_agent/TestHeartbeat.py @@ -24,6 +24,7 @@ import tempfile from mock.mock import patch, MagicMock, call import StringIO import sys +import multiprocessing from ambari_agent.RecoveryManager import RecoveryManager @@ -212,6 +213,7 @@ class TestHeartbeat(TestCase): dummy_controller = MagicMock() actionQueue = ActionQueue(config, dummy_controller) + actionQueue.statusCommandQueue = multiprocessing.Queue() statusCommand = { "serviceName" : 'HDFS', "commandType" : "STATUS_COMMAND", http://git-wip-us.apache.org/repos/asf/ambari/blob/1535bc98/ambari-agent/src/test/python/ambari_agent/TestMain.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestMain.py b/ambari-agent/src/test/python/ambari_agent/TestMain.py index 6f38410..2deca20 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestMain.py +++ b/ambari-agent/src/test/python/ambari_agent/TestMain.py @@ -341,6 +341,7 @@ class TestMain(unittest.TestCase): @patch.object(Controller, "is_alive") @patch.object(Controller, "start") @patch.object(Controller, "getStatusCommandsExecutor") + @patch.object(Controller, "killStatusCommandsExecutorProcess") @patch("optparse.OptionParser.parse_args") @patch.object(DataCleaner,"start") @patch.object(DataCleaner,"__init__") @@ -349,7 +350,8 @@ class TestMain(unittest.TestCase): @patch.object(ExitHelper,"execute_cleanup") @patch.object(ExitHelper, "exit") def test_main(self, exithelper_exit_mock, cleanup_mock, ping_port_init_mock, ping_port_start_mock, data_clean_init_mock,data_clean_start_mock, - parse_args_mock, start_mock, Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, + parse_args_mock, start_mock, Controller_killStatusCommandsExecutorProcess, + Controller_getStatusCommandsExecutor, Controller_is_alive_mock, Controller_init_mock, try_to_connect_mock, update_log_level_mock, daemonize_mock, perform_prestart_checks_mock, ambari_config_mock, stop_mock, bind_signal_handlers_mock,