[04/34] ambari git commit: AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty)

2017-04-17 Thread ncole
AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene 
Chekanskiy via smohanty)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5ef0c99a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5ef0c99a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5ef0c99a

Branch: refs/heads/branch-feature-AMBARI-12556
Commit: 5ef0c99a9d477b63f4e7213d058c9ab2d3ac2feb
Parents: ef34cb4
Author: Sumit Mohanty 
Authored: Wed Apr 12 12:35:15 2017 -0700
Committer: Sumit Mohanty 
Committed: Wed Apr 12 12:35:15 2017 -0700

--
 .../ambari_agent/StatusCommandsExecutor.py  | 279 +--
 1 file changed, 2 insertions(+), 277 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/ambari/blob/5ef0c99a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
--
diff --git 
a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 142e7ca..f42e134 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -83,280 +83,5 @@ class 
SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
   def kill(self, reason=None, can_relaunch=True):
 pass
 
-class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
-  def __init__(self, config, actionQueue):
-self.config = config
-self.actionQueue = actionQueue
-
-self.can_relaunch = True
-
-# used to prevent queues from been used during creation of new one to 
prevent threads messing up with combination of
-# old and new queues
-self.usage_lock = threading.RLock()
-# protects against simultaneous killing/creating from different threads.
-self.kill_lock = threading.RLock()
-
-self.status_command_timeout = int(self.config.get('agent', 
'status_command_timeout', 5))
-self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
-
-self.worker_process = None
-self.mustDieEvent = multiprocessing.Event()
-self.timedOutEvent = multiprocessing.Event()
-
-# multiprocessing stuff that need to be cleaned every time
-self.mp_result_queue = multiprocessing.Queue()
-self.mp_result_logs = multiprocessing.Queue()
-self.mp_task_queue = multiprocessing.Queue()
-
-  def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, 
read_break=.001):
-"""
-Read everything that available in queue. Using not reliable 
multiprocessing.Queue methods(qsize, empty), so contains
-extremely dumb protection against blocking too much at this method: will 
try to get all possible items for not more
-than ``max_time`` seconds; will return after ``max_empty_count`` calls of 
``target_queue.get(False)`` that raised
-``Queue.Empty`` exception. Notice ``read_break`` argument, with default 
values this method will be able to read
-~4500 ``range(1,1)`` objects for 5 seconds. So don't fill queue too 
fast.
-
-:param target_queue: queue to read from
-:param max_time: maximum time to spend in this method call
-:param max_empty_count: maximum allowed ``Queue.Empty`` in a row
-:param read_break: time to wait before next read cycle iteration
-:return: list of resulting objects
-"""
-results = []
-_empty = 0
-_start = time.time()
-with self.usage_lock:
-  try:
-while (not target_queue.empty() or target_queue.qsize() > 0) and 
time.time() - _start < max_time and _empty < max_empty_count:
-  try:
-results.append(target_queue.get(False))
-_empty = 0
-time.sleep(read_break) # sleep a little to get more accurate empty 
and qsize results
-  except Queue.Empty:
-_empty += 1
-  except IOError:
-pass
-  except UnicodeDecodeError:
-pass
-  except IOError:
-pass
-return results
-
-  def _log_message(self, level, message, exception=None):
-"""
-Put log message to logging queue. Must be used only for logging from child 
process(in _worker_process_target).
-
-:param level:
-:param message:
-:param exception:
-:return:
-"""
-result_message = "StatusCommandExecutor reporting at {0}: 
".format(time.time()) + message
-self.mp_result_logs.put((level, result_message, exception))
-
-  def _process_logs(self):
-"""
-Get all available at this moment logs and prints them to logger.
-"""
-for level, message, exception in self._drain_queue(self.mp_result_logs):
-  if level == logging.ERROR:
-logger.debug(message, exc_info=exception)
-  if level == logging.WARN:
-logger.warn(message)
-

ambari git commit: AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty)

2017-04-12 Thread smohanty
Repository: ambari
Updated Branches:
  refs/heads/branch-2.5 05b429714 -> e5940693b


AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene 
Chekanskiy via smohanty)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e5940693
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e5940693
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e5940693

Branch: refs/heads/branch-2.5
Commit: e5940693b968ba2cc0e84af3cce4a80ad013cfef
Parents: 05b4297
Author: Sumit Mohanty 
Authored: Wed Apr 12 12:35:15 2017 -0700
Committer: Sumit Mohanty 
Committed: Wed Apr 12 12:38:00 2017 -0700

--
 .../ambari_agent/StatusCommandsExecutor.py  | 278 +--
 1 file changed, 1 insertion(+), 277 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/ambari/blob/e5940693/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
--
diff --git 
a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 5a8e4ce..b2a65ff 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -83,280 +83,4 @@ class 
SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
   def kill(self, reason=None, can_relaunch=True):
 pass
 
-class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
-  def __init__(self, config, actionQueue):
-self.config = config
-self.actionQueue = actionQueue
-
-self.can_relaunch = True
-
-# used to prevent queues from been used during creation of new one to 
prevent threads messing up with combination of
-# old and new queues
-self.usage_lock = threading.RLock()
-# protects against simultaneous killing/creating from different threads.
-self.kill_lock = threading.RLock()
-
-self.status_command_timeout = int(self.config.get('agent', 
'status_command_timeout', 5))
-self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
-
-self.worker_process = None
-self.mustDieEvent = multiprocessing.Event()
-self.timedOutEvent = multiprocessing.Event()
-
-# multiprocessing stuff that need to be cleaned every time
-self.mp_result_queue = multiprocessing.Queue()
-self.mp_result_logs = multiprocessing.Queue()
-self.mp_task_queue = multiprocessing.Queue()
-
-  def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, 
read_break=.001):
-"""
-Read everything that available in queue. Using not reliable 
multiprocessing.Queue methods(qsize, empty), so contains
-extremely dumb protection against blocking too much at this method: will 
try to get all possible items for not more
-than ``max_time`` seconds; will return after ``max_empty_count`` calls of 
``target_queue.get(False)`` that raised
-``Queue.Empty`` exception. Notice ``read_break`` argument, with default 
values this method will be able to read
-~4500 ``range(1,1)`` objects for 5 seconds. So don't fill queue too 
fast.
-
-:param target_queue: queue to read from
-:param max_time: maximum time to spend in this method call
-:param max_empty_count: maximum allowed ``Queue.Empty`` in a row
-:param read_break: time to wait before next read cycle iteration
-:return: list of resulting objects
-"""
-results = []
-_empty = 0
-_start = time.time()
-with self.usage_lock:
-  try:
-while (not target_queue.empty() or target_queue.qsize() > 0) and 
time.time() - _start < max_time and _empty < max_empty_count:
-  try:
-results.append(target_queue.get(False))
-_empty = 0
-time.sleep(read_break) # sleep a little to get more accurate empty 
and qsize results
-  except Queue.Empty:
-_empty += 1
-  except IOError:
-pass
-  except UnicodeDecodeError:
-pass
-  except IOError:
-pass
-return results
-
-  def _log_message(self, level, message, exception=None):
-"""
-Put log message to logging queue. Must be used only for logging from child 
process(in _worker_process_target).
-
-:param level:
-:param message:
-:param exception:
-:return:
-"""
-result_message = "StatusCommandExecutor reporting at {0}: 
".format(time.time()) + message
-self.mp_result_logs.put((level, result_message, exception))
-
-  def _process_logs(self):
-"""
-Get all available at this moment logs and prints them to logger.
-"""
-for level, message, exception in self._drain_queue(self.mp_result_logs):
-  if level == logging.ERROR:
-logger.debug(message, exc_info=excepti

ambari git commit: AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene Chekanskiy via smohanty)

2017-04-12 Thread smohanty
Repository: ambari
Updated Branches:
  refs/heads/trunk ef34cb4ee -> 5ef0c99a9


AMBARI-20750. Remove multiprocessing-based StatusCommandsExecutor (Eugene 
Chekanskiy via smohanty)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5ef0c99a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5ef0c99a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5ef0c99a

Branch: refs/heads/trunk
Commit: 5ef0c99a9d477b63f4e7213d058c9ab2d3ac2feb
Parents: ef34cb4
Author: Sumit Mohanty 
Authored: Wed Apr 12 12:35:15 2017 -0700
Committer: Sumit Mohanty 
Committed: Wed Apr 12 12:35:15 2017 -0700

--
 .../ambari_agent/StatusCommandsExecutor.py  | 279 +--
 1 file changed, 2 insertions(+), 277 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/ambari/blob/5ef0c99a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
--
diff --git 
a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
index 142e7ca..f42e134 100644
--- a/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/StatusCommandsExecutor.py
@@ -83,280 +83,5 @@ class 
SingleProcessStatusCommandsExecutor(StatusCommandsExecutor):
   def kill(self, reason=None, can_relaunch=True):
 pass
 
-class MultiProcessStatusCommandsExecutor(StatusCommandsExecutor):
-  def __init__(self, config, actionQueue):
-self.config = config
-self.actionQueue = actionQueue
-
-self.can_relaunch = True
-
-# used to prevent queues from been used during creation of new one to 
prevent threads messing up with combination of
-# old and new queues
-self.usage_lock = threading.RLock()
-# protects against simultaneous killing/creating from different threads.
-self.kill_lock = threading.RLock()
-
-self.status_command_timeout = int(self.config.get('agent', 
'status_command_timeout', 5))
-self.customServiceOrchestrator = self.actionQueue.customServiceOrchestrator
-
-self.worker_process = None
-self.mustDieEvent = multiprocessing.Event()
-self.timedOutEvent = multiprocessing.Event()
-
-# multiprocessing stuff that need to be cleaned every time
-self.mp_result_queue = multiprocessing.Queue()
-self.mp_result_logs = multiprocessing.Queue()
-self.mp_task_queue = multiprocessing.Queue()
-
-  def _drain_queue(self, target_queue, max_time=5, max_empty_count=15, 
read_break=.001):
-"""
-Read everything that available in queue. Using not reliable 
multiprocessing.Queue methods(qsize, empty), so contains
-extremely dumb protection against blocking too much at this method: will 
try to get all possible items for not more
-than ``max_time`` seconds; will return after ``max_empty_count`` calls of 
``target_queue.get(False)`` that raised
-``Queue.Empty`` exception. Notice ``read_break`` argument, with default 
values this method will be able to read
-~4500 ``range(1,1)`` objects for 5 seconds. So don't fill queue too 
fast.
-
-:param target_queue: queue to read from
-:param max_time: maximum time to spend in this method call
-:param max_empty_count: maximum allowed ``Queue.Empty`` in a row
-:param read_break: time to wait before next read cycle iteration
-:return: list of resulting objects
-"""
-results = []
-_empty = 0
-_start = time.time()
-with self.usage_lock:
-  try:
-while (not target_queue.empty() or target_queue.qsize() > 0) and 
time.time() - _start < max_time and _empty < max_empty_count:
-  try:
-results.append(target_queue.get(False))
-_empty = 0
-time.sleep(read_break) # sleep a little to get more accurate empty 
and qsize results
-  except Queue.Empty:
-_empty += 1
-  except IOError:
-pass
-  except UnicodeDecodeError:
-pass
-  except IOError:
-pass
-return results
-
-  def _log_message(self, level, message, exception=None):
-"""
-Put log message to logging queue. Must be used only for logging from child 
process(in _worker_process_target).
-
-:param level:
-:param message:
-:param exception:
-:return:
-"""
-result_message = "StatusCommandExecutor reporting at {0}: 
".format(time.time()) + message
-self.mp_result_logs.put((level, result_message, exception))
-
-  def _process_logs(self):
-"""
-Get all available at this moment logs and prints them to logger.
-"""
-for level, message, exception in self._drain_queue(self.mp_result_logs):
-  if level == logging.ERROR:
-logger.debug(message, exc_info=exception)
-