Repository: incubator-eagle Updated Branches: refs/heads/master 7c6315311 -> f5f07402d
[EAGLE-838] Use subprocess for metric collector to avoid defunct process Use subprocess for metric collector to avoid defunct process https://issues.apache.org/jira/browse/EAGLE-838 Author: Hao Chen <h...@apache.org> Closes #737 from haoch/UseSubProcessForMetricCollector. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f5f07402 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f5f07402 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f5f07402 Branch: refs/heads/master Commit: f5f07402d40c19962ea6650092509c32aafb12e6 Parents: 7c63153 Author: Hao Chen <h...@apache.org> Authored: Tue Dec 13 17:00:00 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Tue Dec 13 17:00:00 2016 +0800 ---------------------------------------------------------------------- .../hadoop_jmx_collector/metric_collector.py | 67 +++++++++++--------- 1 file changed, 37 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f5f07402/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index 6205e1f..c7a5599 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -26,8 +26,8 @@ import httplib import logging import threading import fnmatch -import subprocess import os +import multiprocessing # load six sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six')) @@ -242,7 +242,7 @@ class KafkaMetricSender(MetricSender): def open(self): logging.info("Opening kafka connection for producer") self.kafka_client = KafkaClient(self.broker_list, timeout=55) - self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500, + self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=False, batch_send_every_n=500, batch_send_every_t=30) self.start_time = time.time() @@ -325,51 +325,58 @@ class MetricCollector(threading.Thread): class Runner(object): @staticmethod - def run(*collectors): + def worker(collectors, config): """ - Execute concurrently - - :param threads: - :return: - """ - current_pid = os.getpid() + Execute concurrently + :param threads: + :return: + """ try: - argv = sys.argv - if len(argv) == 1: - config = Helper.load_config() - elif len(argv) == 2: - config = Helper.load_config(argv[1]) - else: - raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv)) - - logging.info("PID: %s", current_pid) for collector in collectors: try: collector.init(config) collector.start() except Exception as e: logging.exception(e) - for collector in collectors: - try: - collector.join(timeout = 55) - collector.close() - except BaseException as e: - logging.exception(e) + collector.join(timeout=55) exit(0) except BaseException as e: - if isinstance(e, SystemExit): - logging.info("Exit code: %s", e) - else: + if not isinstance(e, SystemExit): logging.exception(e) exit(1) - logging.info("Exit code: 1") finally: for collector in collectors: if not collector.is_closed(): collector.close() - logging.info("Ensuring process stopped: kill -9 %s", current_pid) - subprocess.call(["kill","-9",str(current_pid)]) + + @staticmethod + def run(*collectors): + config = None + argv = sys.argv + if len(argv) == 1: + config = Helper.load_config() + elif len(argv) == 2: + config = Helper.load_config(argv[1]) + else: + raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv)) + current_process = multiprocessing.current_process() + sub_process = multiprocessing.Process(target=Runner.worker, args=[collectors,config]) + sub_process.daemon = False + sub_process.name = "CollectorSubprocess" + try: + logging.info("Starting %s", sub_process) + sub_process.start() + logging.info("Current PID: %s, subprocess PID: %s", current_process.pid, sub_process.pid) + sub_process.join(timeout = 55) + except BaseException as e: + logging.exception(e) + finally: + if sub_process.is_alive(): + logging.info("%s is still alive, terminating", sub_process) + sub_process.terminate() + logging.info("%s exit code: %s", sub_process, sub_process.exitcode) + exit(0) class JmxMetricCollector(MetricCollector): selected_domain = None