Repository: ambari
Updated Branches:
  refs/heads/trunk fded5a1ae -> 7d875fbac


AMBARI-6048. Ambari Agent script should check for running processes before 
starting (dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7d875fba
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7d875fba
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7d875fba

Branch: refs/heads/trunk
Commit: 7d875fbac7d648e79da8a16ec3d9389bd3ccdbe2
Parents: fded5a1
Author: Lisnichenko Dmitro <dlysniche...@hortonworks.com>
Authored: Wed Jun 11 17:05:49 2014 +0300
Committer: Lisnichenko Dmitro <dlysniche...@hortonworks.com>
Committed: Wed Jun 11 17:05:49 2014 +0300

----------------------------------------------------------------------
 .../python/ambari_agent/PingPortListener.py     | 32 +++++++++++++-------
 .../src/main/python/ambari_agent/main.py        |  8 ++++-
 .../python/ambari_agent/TestPingPortListener.py | 28 ++++++++++++-----
 3 files changed, 49 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7d875fba/ambari-agent/src/main/python/ambari_agent/PingPortListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/PingPortListener.py 
b/ambari-agent/src/main/python/ambari_agent/PingPortListener.py
index f24b78a..4b2c5fb 100644
--- a/ambari-agent/src/main/python/ambari_agent/PingPortListener.py
+++ b/ambari-agent/src/main/python/ambari_agent/PingPortListener.py
@@ -23,12 +23,15 @@ import logging
 import AmbariConfig
 import threading
 import socket
+import subprocess
 
 logger = logging.getLogger()
+FUSER_CMD = "fuser {0}/tcp 2>&1 | awk '{1}'"
+PSPF_CMD = "ps -fp {0}"
+PORT_IN_USE_MESSAGE = "Could not open port {0} because port already used by 
another process:\n{1}"
 
 class PingPortListener(threading.Thread):
 
-
   def __init__(self, config):
     threading.Thread.__init__(self)
     self.daemon = True
@@ -36,17 +39,24 @@ class PingPortListener(threading.Thread):
     self.config = config
     self.host = '0.0.0.0'
     self.port = int(self.config.get('agent','ping_port'))
-    try:
-      self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-      self.socket.bind((self.host, self.port))
-      self.socket.listen(1)
-    except Exception as ex:
-      logger.error("Failed to start ping port listener of:" + str(ex));
-      sys.exit(1)
-    else:
-      config.set('agent','current_ping_port',str(self.socket.getsockname()[1]))
-      logger.info("Ping port listener started on port: " + 
str(self.socket.getsockname()[1]))
+    if not self.port == None and not self.port == 0:
+      (stdoutdata, stderrdata) = 
self.run_os_command_in_shell(FUSER_CMD.format(str(self.port), "{print $2}"))
+      if stdoutdata.strip():
+        (stdoutdata, stderrdata) = 
self.run_os_command_in_shell(PSPF_CMD.format(stdoutdata.strip()))
+        raise Exception(PORT_IN_USE_MESSAGE.format(str(self.port), 
stdoutdata))      
+    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    self.socket.bind((self.host, self.port))
+    self.socket.listen(1)
+    config.set('agent','current_ping_port',str(self.socket.getsockname()[1]))
+    logger.info("Ping port listener started on port: " + 
str(self.socket.getsockname()[1]))
+
 
+  def run_os_command_in_shell(self, command):
+    process = subprocess.Popen(command, stdout=subprocess.PIPE,
+              stdin=subprocess.PIPE,
+              stderr=subprocess.PIPE,
+              shell=True)
+    return process.communicate()
 
   def __del__(self):
     logger.info("Ping port listener killed")

http://git-wip-us.apache.org/repos/asf/ambari/blob/7d875fba/ambari-agent/src/main/python/ambari_agent/main.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/main.py 
b/ambari-agent/src/main/python/ambari_agent/main.py
index 67b47b3..78dde9e 100644
--- a/ambari-agent/src/main/python/ambari_agent/main.py
+++ b/ambari-agent/src/main/python/ambari_agent/main.py
@@ -211,7 +211,13 @@ def main():
   daemonize()
 
   # Starting ping port listener
-  ping_port_listener = PingPortListener(config)
+  try:
+    ping_port_listener = PingPortListener(config)
+  except Exception as ex:
+    err_message = "Failed to start ping port listener of: " + str(ex)
+    logger.error(err_message);
+    sys.stderr.write(err_message)
+    sys.exit(1)
   ping_port_listener.start()
 
   update_log_level(config)

http://git-wip-us.apache.org/repos/asf/ambari/blob/7d875fba/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py 
b/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py
index 39277d5..0a36cce 100644
--- a/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py
+++ b/ambari-agent/src/test/python/ambari_agent/TestPingPortListener.py
@@ -31,10 +31,17 @@ class TestPingPortListener(unittest.TestCase):
     self.config.get.return_value = 55000
     PingPortListener.logger = MagicMock()
 
+  @patch("subprocess.Popen")
   @patch("socket.socket")
-  def test_init_success(self,socketMock):
+  def test_init_success(self,socketMock,popen_mock):
+    procObj = MagicMock()
+    procObj.communicate = MagicMock()
+    procObj.communicate.return_value = {"": 0, "log": "log"}
+    popen_mock.return_value = procObj
     PingPortListener.logger.reset_mock()
+    popen_mock.reset_mock()
     allive_daemon = PingPortListener.PingPortListener(self.config)
+    self.assertTrue(popen_mock.called)
     self.assertFalse(PingPortListener.logger.warn.called)
     self.assertTrue(socketMock.call_args_list[0][0][0] == socket.AF_INET)
     self.assertTrue(socketMock.call_args_list[0][0][1] == socket.SOCK_STREAM)
@@ -45,15 +52,22 @@ class TestPingPortListener(unittest.TestCase):
 
 
 
+  @patch("subprocess.Popen")
   @patch.object(socket.socket,"bind")
   @patch.object(socket.socket,"listen")
-  @patch.object(socket.socket,"__init__")
-  @patch.object(sys, "exit")
-  def test_init_warn(self, sys_exit_mock, 
socketInitMock,socketListenMock,socketBindMock):
+  def test_init_warn(self,socketListenMock,socketBindMock,popen_mock):
+    procObj = MagicMock()
+    procObj.communicate = MagicMock()
+    procObj.communicate.return_value = {"mine.py": 0, "log": "log"}
+    popen_mock.return_value = procObj
     PingPortListener.logger.reset_mock()
-    allive_daemon = PingPortListener.PingPortListener(self.config)
-    self.assertTrue(socketInitMock.called)
-    self.assertTrue(sys_exit_mock.called)
+    try:
+      PingPortListener.PingPortListener(self.config)
+      self.fail("Should throw exception")
+    except Exception as fe:
+      # Expected
+      self.assertTrue("port already used" in str(fe))
+      pass
 
 if __name__ == "__main__":
   suite = unittest.TestLoader().loadTestsFromTestCase(PingPortListener)

Reply via email to