Repository: ambari Updated Branches: refs/heads/trunk fded5a1ae -> 7d875fbac
AMBARI-6048. Ambari Agent script should check for running processes before starting (dlysnichenko) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7d875fba Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7d875fba Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7d875fba Branch: refs/heads/trunk Commit: 7d875fbac7d648e79da8a16ec3d9389bd3ccdbe2 Parents: fded5a1 Author: Lisnichenko Dmitro <dlysniche...@hortonworks.com> Authored: Wed Jun 11 17:05:49 2014 +0300 Committer: Lisnichenko Dmitro <dlysniche...@hortonworks.com> Committed: Wed Jun 11 17:05:49 2014 +0300 ---------------------------------------------------------------------- .../python/ambari_agent/PingPortListener.py | 32 +++++++++++++------- .../src/main/python/ambari_agent/main.py | 8 ++++- .../python/ambari_agent/TestPingPortListener.py | 28 ++++++++++++----- 3 files changed, 49 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/7d875fba/ambari-agent/src/main/python/ambari_agent/PingPortListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/PingPortListener.py b/ambari-agent/src/main/python/ambari_agent/PingPortListener.py index f24b78a..4b2c5fb 100644 --- a/ambari-agent/src/main/python/ambari_agent/PingPortListener.py +++ b/ambari-agent/src/main/python/ambari_agent/PingPortListener.py @@ -23,12 +23,15 @@ import logging import AmbariConfig import threading import socket +import subprocess logger = logging.getLogger() +FUSER_CMD = "fuser {0}/tcp 2>&1 | awk '{1}'" +PSPF_CMD = "ps -fp {0}" +PORT_IN_USE_MESSAGE = "Could not open port {0} because port already used by another process:\n{1}" class PingPortListener(threading.Thread): - def __init__(self, config): threading.Thread.__init__(self) self.daemon = True @@ -36,17 +39,24 @@ class PingPortListener(threading.Thread): self.config = config self.host = '0.0.0.0' self.port = int(self.config.get('agent','ping_port')) - try: - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.bind((self.host, self.port)) - self.socket.listen(1) - except Exception as ex: - logger.error("Failed to start ping port listener of:" + str(ex)); - sys.exit(1) - else: - config.set('agent','current_ping_port',str(self.socket.getsockname()[1])) - logger.info("Ping port listener started on port: " + str(self.socket.getsockname()[1])) + if not self.port == None and not self.port == 0: + (stdoutdata, stderrdata) = self.run_os_command_in_shell(FUSER_CMD.format(str(self.port), "{print $2}")) + if stdoutdata.strip(): + (stdoutdata, stderrdata) = self.run_os_command_in_shell(PSPF_CMD.format(stdoutdata.strip())) + raise Exception(PORT_IN_USE_MESSAGE.format(str(self.port), stdoutdata)) + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.bind((self.host, self.port)) + self.socket.listen(1) + config.set('agent','current_ping_port',str(self.socket.getsockname()[1])) + logger.info("Ping port listener started on port: " + str(self.socket.getsockname()[1])) + + def run_os_command_in_shell(self, command): + process = subprocess.Popen(command, stdout=subprocess.PIPE, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=True) + return process.communicate() def __del__(self): logger.info("Ping port listener killed") http://git-wip-us.apache.org/repos/asf/ambari/blob/7d875fba/ambari-agent/src/main/python/ambari_agent/main.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/main.py b/ambari-agent/src/main/python/ambari_agent/main.py index 67b47b3..78dde9e 100644 --- a/ambari-agent/src/main/python/ambari_agent/main.py +++ b/ambari-agent/src/main/python/ambari_agent/main.py @@ -211,7 +211,13 @@ def main(): daemonize() # Starting ping port listener - ping_port_listener = PingPortListener(config) + try: + ping_port_listener = PingPortListener(config) + except Exception as ex: + err_message = "Failed to start ping port listener of: " + str(ex) + logger.error(err_message); + sys.stderr.write(err_message) + sys.exit(1) ping_port_listener.start() update_log_level(config) http://git-wip-us.apache.org/repos/asf/ambari/blob/7d875fba/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py b/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py index 39277d5..0a36cce 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py +++ b/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py @@ -31,10 +31,17 @@ class TestPingPortListener(unittest.TestCase): self.config.get.return_value = 55000 PingPortListener.logger = MagicMock() + @patch("subprocess.Popen") @patch("socket.socket") - def test_init_success(self,socketMock): + def test_init_success(self,socketMock,popen_mock): + procObj = MagicMock() + procObj.communicate = MagicMock() + procObj.communicate.return_value = {"": 0, "log": "log"} + popen_mock.return_value = procObj PingPortListener.logger.reset_mock() + popen_mock.reset_mock() allive_daemon = PingPortListener.PingPortListener(self.config) + self.assertTrue(popen_mock.called) self.assertFalse(PingPortListener.logger.warn.called) self.assertTrue(socketMock.call_args_list[0][0][0] == socket.AF_INET) self.assertTrue(socketMock.call_args_list[0][0][1] == socket.SOCK_STREAM) @@ -45,15 +52,22 @@ class TestPingPortListener(unittest.TestCase): + @patch("subprocess.Popen") @patch.object(socket.socket,"bind") @patch.object(socket.socket,"listen") - @patch.object(socket.socket,"__init__") - @patch.object(sys, "exit") - def test_init_warn(self, sys_exit_mock, socketInitMock,socketListenMock,socketBindMock): + def test_init_warn(self,socketListenMock,socketBindMock,popen_mock): + procObj = MagicMock() + procObj.communicate = MagicMock() + procObj.communicate.return_value = {"mine.py": 0, "log": "log"} + popen_mock.return_value = procObj PingPortListener.logger.reset_mock() - allive_daemon = PingPortListener.PingPortListener(self.config) - self.assertTrue(socketInitMock.called) - self.assertTrue(sys_exit_mock.called) + try: + PingPortListener.PingPortListener(self.config) + self.fail("Should throw exception") + except Exception as fe: + # Expected + self.assertTrue("port already used" in str(fe)) + pass if __name__ == "__main__": suite = unittest.TestLoader().loadTestsFromTestCase(PingPortListener)