Ejegg has uploaded a new change for review. ( https://gerrit.wikimedia.org/r/345424 )
Change subject: WIP stream subprocess stdout to log file ...................................................................... WIP stream subprocess stdout to log file Change-Id: Icf9fec6ced84b3107511a8ccc4c905076ad1ffd6 --- M processcontrol/job_wrapper.py A processcontrol/output_streamer.py 2 files changed, 56 insertions(+), 36 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/process-control refs/changes/24/345424/1 diff --git a/processcontrol/job_wrapper.py b/processcontrol/job_wrapper.py index 321798b..2b3fa2e 100644 --- a/processcontrol/job_wrapper.py +++ b/processcontrol/job_wrapper.py @@ -8,6 +8,7 @@ from . import config from . import lock from . import mailer +from . import output_streamer # TODO: uh has no raison d'etre now other than to demonstrate factoryness. @@ -64,6 +65,9 @@ command = shlex.split(self.config.get("command")) self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.environment) + streamer = output_streamer.OutputStreamer(self.process, self.slug, self.start_time) + streamer.start() + timer = threading.Timer(self.timeout, self.fail_timeout) timer.start() @@ -71,7 +75,7 @@ # FIXME: This doesn't stream, so large output will be buffered in memory. (stdout_data, stderr_data) = self.process.communicate() - self.store_job_output(stdout_data, stderr_data) +# self.store_job_output(stdout_data, stderr_data) if len(stderr_data) > 0: self.fail_has_stderr(stderr_data) @@ -102,41 +106,6 @@ config.log.error(message) self.mailer.fail_mail(message) # FIXME: Job will return SIGKILL now, fail_exitcode should ignore that signal now? - - def store_job_output(self, stdout_data, stderr_data): - output_directory = self.global_config.get("output_directory") - assert os.access(output_directory, os.W_OK) - - # per-job directory - job_directory = output_directory + "/" + self.name - if not os.path.exists(job_directory): - os.makedirs(job_directory) - - timestamp = self.start_time.strftime("%Y%m%d-%H%M%S") - filename = "{logdir}/{name}-{timestamp}.log".format(logdir=job_directory, name=self.name, timestamp=timestamp) - with open(filename, "a") as out: - header = ( - "===========\n" - "{name} ({pid}), started at {time}\n" - "-----------\n" - ).format(name=self.name, pid=self.process.pid, time=self.start_time.isoformat()) - out.write(header) - - if len(stdout_data) == 0: - buf = "* No output *\n" - else: - buf = stdout_data.decode("utf-8") - out.write(buf) - - if len(stderr_data) > 0: - header = ( - "-----------\n" - "Even worse, the job emitted errors:\n" - "-----------\n" - ) - out.write(header) - - out.write(stderr_data.decode("utf-8")) def status(self): """Check for any running instances of this job, in this process or another. diff --git a/processcontrol/output_streamer.py b/processcontrol/output_streamer.py new file mode 100644 index 0000000..e09f3d0 --- /dev/null +++ b/processcontrol/output_streamer.py @@ -0,0 +1,51 @@ +import logging.config +import os +import threading + +from . import config + +class OutputStreamer(object): + + def __init__(self, process, slug, start_time): + self.stream = process.stdout + self.pid = process.pid + self.slug = slug + self.start_time = start_time + + def start(self): + self.create_logger() + out_thread = threading.Thread(target=self.read_output) + out_thread.daemon = True + out_thread.start() + + def read_output(self): + while True: + line = self.stream.readline() + if line == '': + break; + self.logger.info(line) + + def create_logger(self): + output_directory = config.GlobalConfiguration().get("output_directory") + assert os.access(output_directory, os.W_OK) + + # per-job directory + job_directory = output_directory + "/" + self.slug + if not os.path.exists(job_directory): + os.makedirs(job_directory) + + timestamp = self.start_time.strftime("%Y%m%d-%H%M%S") + filename = "{logdir}/{name}-{timestamp}.log".format(logdir=job_directory, name=self.slug, timestamp=timestamp) + + self.logger = logging.getLogger(self.slug) + self.logger.setLevel(logging.INFO) + + fh = logging.FileHandler(filename) + formatter = logging.Formatter("%(asctime)s - %(message)s") + fh.setFormatter(formatter) + + self.logger.addHandler(fh) + + self.logger.info("===========") + self.logger.info("{name} ({pid})".format(name=self.slug, pid=self.pid)) + self.logger.info("-----------") -- To view, visit https://gerrit.wikimedia.org/r/345424 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Icf9fec6ced84b3107511a8ccc4c905076ad1ffd6 Gerrit-PatchSet: 1 Gerrit-Project: wikimedia/fundraising/process-control Gerrit-Branch: master Gerrit-Owner: Ejegg <eeggles...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits