jenkins-bot has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/346574 )
Change subject: Split job spec from job runner ...................................................................... Split job spec from job runner Change-Id: I44e976f46c7159e02c431c470e74e54f8ad12229 --- M bin/run-job M processcontrol/crontab.py A processcontrol/job_spec.py D processcontrol/job_wrapper.py A processcontrol/runner.py R tests/test_job_runner.py 6 files changed, 199 insertions(+), 186 deletions(-) Approvals: jenkins-bot: Verified Ejegg: Looks good to me, approved diff --git a/bin/run-job b/bin/run-job index 57f893d..62dca5f 100755 --- a/bin/run-job +++ b/bin/run-job @@ -5,15 +5,16 @@ import sys import yaml -from processcontrol import job_wrapper +from processcontrol import runner +from processcontrol import job_spec def list_jobs(): - for job_name in job_wrapper.list(): + for job_name in job_spec.list(): try: - job = job_wrapper.load(job_name) + job = job_spec.load(job_name) message = "{job} - {human_name}".format(job=job_name, human_name=job.name) - status = job.status() + status = runner.JobRunner(job).status() if status is not None: message += " " + yaml.dump(status) except AssertionError: @@ -28,8 +29,9 @@ args = parser.parse_args() if args.job: - job = job_wrapper.load(args.job) - job.run() + job = job_spec.load(args.job) + runner = runner.JobRunner(job) + runner.run() if args.list_jobs: list_jobs() diff --git a/processcontrol/crontab.py b/processcontrol/crontab.py index 9359cd0..82266ca 100644 --- a/processcontrol/crontab.py +++ b/processcontrol/crontab.py @@ -1,7 +1,7 @@ from __future__ import print_function from . import config -from . import job_wrapper +from . import job_spec def make_cron(): @@ -9,12 +9,12 @@ Read all files from the dir and output a crontab. ''' - jobs = job_wrapper.list() + jobs = job_spec.list() cron_text = "" for job_name in jobs: # FIXME just use the configuration classes, no need for job - job = job_wrapper.load(job_name) + job = job_spec.load(job_name) tab = JobCrontab(job) cron_text += str(tab) diff --git a/processcontrol/job_spec.py b/processcontrol/job_spec.py new file mode 100644 index 0000000..a8a2e58 --- /dev/null +++ b/processcontrol/job_spec.py @@ -0,0 +1,66 @@ +import glob +import os + +from . import config + + +# TODO: uh has no raison d'etre now other than to demonstrate factoryness. +def load(job_name): + return Job(slug=job_name) + + +def list(): + """Return a tuple of all available job names.""" + job_directory = config.GlobalConfiguration().get("job_directory") + paths = sorted(glob.glob(job_directory + "/*.yaml")) + file_names = [os.path.basename(p) for p in paths] + job_names = [f.replace(".yaml", "") for f in file_names] + return job_names + + +def job_path_for_slug(slug): + global_config = config.GlobalConfiguration() + job_directory = global_config.get("job_directory") + path = "{root_dir}/{slug}.yaml".format(root_dir=job_directory, slug=slug) + return path + + +class Job(object): + def __init__(self, slug=None): + self.global_config = config.GlobalConfiguration() + self.config_path = job_path_for_slug(slug) + + # Validate that we're not allowing directory traversal. + assert os.path.dirname(os.path.realpath(self.config_path)) == os.path.abspath(self.global_config.get("job_directory")) + + self.config = config.JobConfiguration(self.global_config, self.config_path) + + self.name = self.config.get("name") + self.slug = slug + if self.config.has("timeout"): + self.timeout = self.config.get("timeout") + else: + self.timeout = 0 + + if self.config.has("disabled") and self.config.get("disabled") is True: + self.enabled = False + else: + self.enabled = True + + if not self.config.has("schedule"): + self.enabled = False + + self.environment = os.environ.copy() + if self.config.has("environment"): + # Force all values to string + str_env = {k: str(v) for k, v in self.config.get("environment").items()} + self.environment.update(str_env) + + command = self.config.get("command") + if hasattr(command, "encode"): + # Is stringlike, so cast to a list and handle along with the plural + # case below. + command = [command] + # Otherwise, it's already a list. + + self.commands = command diff --git a/processcontrol/job_wrapper.py b/processcontrol/job_wrapper.py deleted file mode 100644 index 565c188..0000000 --- a/processcontrol/job_wrapper.py +++ /dev/null @@ -1,171 +0,0 @@ -import datetime -import glob -import os -import pwd -import shlex -import subprocess -import threading - -from . import config -from . import lock -from . import mailer -from . import output_streamer - - -# TODO: uh has no raison d'etre now other than to demonstrate factoryness. -def load(job_name): - return JobWrapper(slug=job_name) - - -def list(): - """Return a tuple of all available job names.""" - job_directory = config.GlobalConfiguration().get("job_directory") - paths = sorted(glob.glob(job_directory + "/*.yaml")) - file_names = [os.path.basename(p) for p in paths] - job_names = [f.replace(".yaml", "") for f in file_names] - return job_names - - -def job_path_for_slug(slug): - global_config = config.GlobalConfiguration() - job_directory = global_config.get("job_directory") - path = "{root_dir}/{slug}.yaml".format(root_dir=job_directory, slug=slug) - return path - - -class JobWrapper(object): - def __init__(self, slug=None): - self.global_config = config.GlobalConfiguration() - self.config_path = job_path_for_slug(slug) - - # Validate that we're not allowing directory traversal. - assert os.path.dirname(os.path.realpath(self.config_path)) == os.path.abspath(self.global_config.get("job_directory")) - - self.config = config.JobConfiguration(self.global_config, self.config_path) - - self.name = self.config.get("name") - self.slug = slug - self.start_time = datetime.datetime.utcnow() - self.mailer = mailer.Mailer(self.config) - if self.config.has("timeout"): - self.timeout = self.config.get("timeout") - else: - self.timeout = 0 - - if self.config.has("disabled") and self.config.get("disabled") is True: - self.enabled = False - else: - self.enabled = True - - if not self.config.has("schedule"): - self.enabled = False - - self.environment = os.environ.copy() - if self.config.has("environment"): - # Force all values to string - str_env = {k: str(v) for k, v in self.config.get("environment").items()} - self.environment.update(str_env) - - command = self.config.get("command") - if hasattr(command, "encode"): - # Is stringlike, so cast to a list and handle along with the plural - # case below. - command = [command] - # Otherwise, it's already a list. - - self.commands = command - - def run(self): - # Check that we are the service user. - service_user = str(self.global_config.get("user")) - if service_user.isdigit(): - passwd_entry = pwd.getpwuid(int(service_user)) - else: - passwd_entry = pwd.getpwnam(service_user) - assert passwd_entry.pw_uid == os.getuid() - - lock.begin(job_tag=self.slug) - - config.log.info("Running job {name} ({slug})".format(name=self.name, slug=self.slug)) - - # Spawn timeout monitor thread. - if self.timeout > 0: - # Convert minutes to seconds. - timeout_seconds = self.timeout * 60 - timer = threading.Timer(timeout_seconds, self.fail_timeout) - timer.start() - - try: - for command_line in self.commands: - self.run_command(command_line) - finally: - lock.end() - if self.timeout > 0: - timer.cancel() - - def run_command(self, command_string): - # TODO: Log commandline into the output log as well. - config.log.info("Running command: {cmd}".format(cmd=command_string)) - - command = shlex.split(command_string) - - self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.environment) - streamer = output_streamer.OutputStreamer(self.process, self.slug, self.start_time) - streamer.start() - - # should be safe from deadlocks because our OutputStreamer - # has been consuming stderr and stdout - self.process.wait() - - streamer.stop() - - return_code = self.process.returncode - if return_code != 0: - self.fail_exitcode(return_code) - - self.process = None - - def fail_exitcode(self, return_code): - message = "Job {name} failed with code {code}".format(name=self.name, code=return_code) - config.log.error(message) - # TODO: Prevent future jobs according to config. - self.mailer.fail_mail(message) - raise JobFailure(message) - - def fail_has_stderr(self, stderr_data): - message = "Job {name} printed things to stderr:".format(name=self.name) - config.log.error(message) - body = stderr_data.decode("utf-8") - config.log.error(body) - self.mailer.fail_mail(message, body) - raise JobFailure(message) - - def fail_timeout(self): - self.process.kill() - message = "Job {name} timed out after {timeout} minutes".format(name=self.name, timeout=self.timeout) - config.log.error(message) - self.mailer.fail_mail(message) - # FIXME: Job will return SIGKILL now, fail_exitcode should ignore that signal now? - raise JobFailure(message) - - def status(self): - """Check for any running instances of this job, in this process or another. - - Returns a crappy dict, or None if no process is found. - - Do not use this function to gate the workflow, explicitly assert the - lock instead.""" - - # FIXME: DRY--find a good line to cut at to split out lock.read_pid. - lock_path = lock.path_for_job(self.slug) - if os.path.exists(lock_path): - with open(lock_path, "r") as f: - pid = int(f.read().strip()) - # TODO: encapsulate - return {"status": "running", "pid": pid} - - return None - - -class JobFailure(RuntimeError): - pass diff --git a/processcontrol/runner.py b/processcontrol/runner.py new file mode 100644 index 0000000..1b05b57 --- /dev/null +++ b/processcontrol/runner.py @@ -0,0 +1,114 @@ +import datetime +import os +import pwd +import shlex +import subprocess +import threading + +from . import config +from . import lock +from . import mailer +from . import output_streamer + + +class JobRunner(object): + def __init__(self, job): + self.global_config = config.GlobalConfiguration() + self.job = job + self.mailer = mailer.Mailer(self.job.config) + + def run(self): + # Check that we are the service user. + service_user = str(self.global_config.get("user")) + if service_user.isdigit(): + passwd_entry = pwd.getpwuid(int(service_user)) + else: + passwd_entry = pwd.getpwnam(service_user) + assert passwd_entry.pw_uid == os.getuid() + + lock.begin(job_tag=self.job.slug) + self.start_time = datetime.datetime.utcnow() + + config.log.info("Running job {name} ({slug})".format(name=self.job.name, slug=self.job.slug)) + + # Spawn timeout monitor thread. + if self.job.timeout > 0: + # Convert minutes to seconds. + timeout_seconds = self.job.timeout * 60 + timer = threading.Timer(timeout_seconds, self.fail_timeout) + timer.start() + + try: + for command_line in self.job.commands: + self.run_command(command_line) + finally: + lock.end() + if self.job.timeout > 0: + timer.cancel() + + def run_command(self, command_string): + # TODO: Log commandline into the output log as well. + config.log.info("Running command: {cmd}".format(cmd=command_string)) + + command = shlex.split(command_string) + + self.process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=self.job.environment) + streamer = output_streamer.OutputStreamer(self.process, self.job.slug, self.start_time) + streamer.start() + + # should be safe from deadlocks because our OutputStreamer + # has been consuming stderr and stdout + self.process.wait() + + streamer.stop() + + return_code = self.process.returncode + if return_code != 0: + self.fail_exitcode(return_code) + + self.process = None + + def fail_exitcode(self, return_code): + message = "Job {name} failed with code {code}".format(name=self.job.name, code=return_code) + config.log.error(message) + # TODO: Prevent future jobs according to config. + self.mailer.fail_mail(message) + raise JobFailure(message) + + def fail_has_stderr(self, stderr_data): + message = "Job {name} printed things to stderr:".format(name=self.job.name) + config.log.error(message) + body = stderr_data.decode("utf-8") + config.log.error(body) + self.mailer.fail_mail(message, body) + raise JobFailure(message) + + def fail_timeout(self): + self.process.kill() + message = "Job {name} timed out after {timeout} minutes".format(name=self.job.name, timeout=self.job.timeout) + config.log.error(message) + self.mailer.fail_mail(message) + # FIXME: Job will return SIGKILL now, fail_exitcode should ignore that signal now? + raise JobFailure(message) + + def status(self): + """Check for any running instances of this job, in this process or another. + + Returns a crappy dict, or None if no process is found. + + Do not use this function to gate the workflow, explicitly assert the + lock instead.""" + + # FIXME: DRY--find a good line to cut at to split out lock.read_pid. + lock_path = lock.path_for_job(self.job.slug) + if os.path.exists(lock_path): + with open(lock_path, "r") as f: + pid = int(f.read().strip()) + # TODO: encapsulate + return {"status": "running", "pid": pid} + + return None + + +class JobFailure(RuntimeError): + pass diff --git a/tests/test_job_wrapper.py b/tests/test_job_runner.py similarity index 87% rename from tests/test_job_wrapper.py rename to tests/test_job_runner.py index 31e3e9d..a82e808 100644 --- a/tests/test_job_wrapper.py +++ b/tests/test_job_runner.py @@ -4,7 +4,8 @@ import os import testfixtures -from processcontrol import job_wrapper +from processcontrol import runner +from processcontrol import job_spec from . import override_config @@ -18,8 +19,9 @@ def run_job(job_name): - job = job_wrapper.load(job_name) - job.run() + job = job_spec.load(job_name) + job_runner = runner.JobRunner(job) + job_runner.run() def test_success(): @@ -45,7 +47,7 @@ @mock.patch("smtplib.SMTP") @testfixtures.log_capture() def test_return_code(MockSmtp, caplog): - with nose.tools.assert_raises(job_wrapper.JobFailure): + with nose.tools.assert_raises(runner.JobFailure): run_job("return_code") loglines = caplog.actual() @@ -59,7 +61,7 @@ @mock.patch("smtplib.SMTP") @testfixtures.log_capture() def test_timeout(MockSmtp, caplog): - with nose.tools.assert_raises(job_wrapper.JobFailure): + with nose.tools.assert_raises(runner.JobFailure): run_job("timeout") loglines = caplog.actual() @@ -72,7 +74,7 @@ @mock.patch("smtplib.SMTP") @testfixtures.log_capture() def test_stderr(MockSmtp, caplog): - with nose.tools.assert_raises(job_wrapper.JobFailure): + with nose.tools.assert_raises(runner.JobFailure): run_job("errors") loglines = list(caplog.actual()) -- To view, visit https://gerrit.wikimedia.org/r/346574 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I44e976f46c7159e02c431c470e74e54f8ad12229 Gerrit-PatchSet: 7 Gerrit-Project: wikimedia/fundraising/process-control Gerrit-Branch: master Gerrit-Owner: Awight <awi...@wikimedia.org> Gerrit-Reviewer: Cdentinger <cdentin...@wikimedia.org> Gerrit-Reviewer: Ejegg <eeggles...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits