jenkins-bot has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/346574 )

Change subject: Split job spec from job runner
......................................................................


Split job spec from job runner

Change-Id: I44e976f46c7159e02c431c470e74e54f8ad12229
---
M bin/run-job
M processcontrol/crontab.py
A processcontrol/job_spec.py
D processcontrol/job_wrapper.py
A processcontrol/runner.py
R tests/test_job_runner.py
6 files changed, 199 insertions(+), 186 deletions(-)

Approvals:
  jenkins-bot: Verified
  Ejegg: Looks good to me, approved



diff --git a/bin/run-job b/bin/run-job
index 57f893d..62dca5f 100755
--- a/bin/run-job
+++ b/bin/run-job
@@ -5,15 +5,16 @@
 import sys
 import yaml
 
-from processcontrol import job_wrapper
+from processcontrol import runner
+from processcontrol import job_spec
 
 
 def list_jobs():
-       for job_name in job_wrapper.list():
+       for job_name in job_spec.list():
                try:
-                       job = job_wrapper.load(job_name)
+                       job = job_spec.load(job_name)
                        message = "{job} - {human_name}".format(job=job_name, 
human_name=job.name)
-                       status = job.status()
+                       status = runner.JobRunner(job).status()
                        if status is not None:
                                message += "    " + yaml.dump(status)
                except AssertionError:
@@ -28,8 +29,9 @@
        args = parser.parse_args()
 
        if args.job:
-               job = job_wrapper.load(args.job)
-               job.run()
+               job = job_spec.load(args.job)
+               runner = runner.JobRunner(job)
+               runner.run()
 
        if args.list_jobs:
                list_jobs()
diff --git a/processcontrol/crontab.py b/processcontrol/crontab.py
index 9359cd0..82266ca 100644
--- a/processcontrol/crontab.py
+++ b/processcontrol/crontab.py
@@ -1,7 +1,7 @@
 from __future__ import print_function
 
 from . import config
-from . import job_wrapper
+from . import job_spec
 
 
 def make_cron():
@@ -9,12 +9,12 @@
     Read all files from the dir and output a crontab.
     '''
 
-    jobs = job_wrapper.list()
+    jobs = job_spec.list()
     cron_text = ""
 
     for job_name in jobs:
         # FIXME just use the configuration classes, no need for job
-        job = job_wrapper.load(job_name)
+        job = job_spec.load(job_name)
         tab = JobCrontab(job)
 
         cron_text += str(tab)
diff --git a/processcontrol/job_spec.py b/processcontrol/job_spec.py
new file mode 100644
index 0000000..a8a2e58
--- /dev/null
+++ b/processcontrol/job_spec.py
@@ -0,0 +1,66 @@
+import glob
+import os
+
+from . import config
+
+
+# TODO: uh has no raison d'etre now other than to demonstrate factoryness.
+def load(job_name):
+    return Job(slug=job_name)
+
+
+def list():
+    """Return a tuple of all available job names."""
+    job_directory = config.GlobalConfiguration().get("job_directory")
+    paths = sorted(glob.glob(job_directory + "/*.yaml"))
+    file_names = [os.path.basename(p) for p in paths]
+    job_names = [f.replace(".yaml", "") for f in file_names]
+    return job_names
+
+
+def job_path_for_slug(slug):
+    global_config = config.GlobalConfiguration()
+    job_directory = global_config.get("job_directory")
+    path = "{root_dir}/{slug}.yaml".format(root_dir=job_directory, slug=slug)
+    return path
+
+
+class Job(object):
+    def __init__(self, slug=None):
+        self.global_config = config.GlobalConfiguration()
+        self.config_path = job_path_for_slug(slug)
+
+        # Validate that we're not allowing directory traversal.
+        assert os.path.dirname(os.path.realpath(self.config_path)) == 
os.path.abspath(self.global_config.get("job_directory"))
+
+        self.config = config.JobConfiguration(self.global_config, 
self.config_path)
+
+        self.name = self.config.get("name")
+        self.slug = slug
+        if self.config.has("timeout"):
+            self.timeout = self.config.get("timeout")
+        else:
+            self.timeout = 0
+
+        if self.config.has("disabled") and self.config.get("disabled") is True:
+            self.enabled = False
+        else:
+            self.enabled = True
+
+        if not self.config.has("schedule"):
+            self.enabled = False
+
+        self.environment = os.environ.copy()
+        if self.config.has("environment"):
+            # Force all values to string
+            str_env = {k: str(v) for k, v in 
self.config.get("environment").items()}
+            self.environment.update(str_env)
+
+        command = self.config.get("command")
+        if hasattr(command, "encode"):
+            # Is stringlike, so cast to a list and handle along with the plural
+            # case below.
+            command = [command]
+        # Otherwise, it's already a list.
+
+        self.commands = command
diff --git a/processcontrol/job_wrapper.py b/processcontrol/job_wrapper.py
deleted file mode 100644
index 565c188..0000000
--- a/processcontrol/job_wrapper.py
+++ /dev/null
@@ -1,171 +0,0 @@
-import datetime
-import glob
-import os
-import pwd
-import shlex
-import subprocess
-import threading
-
-from . import config
-from . import lock
-from . import mailer
-from . import output_streamer
-
-
-# TODO: uh has no raison d'etre now other than to demonstrate factoryness.
-def load(job_name):
-    return JobWrapper(slug=job_name)
-
-
-def list():
-    """Return a tuple of all available job names."""
-    job_directory = config.GlobalConfiguration().get("job_directory")
-    paths = sorted(glob.glob(job_directory + "/*.yaml"))
-    file_names = [os.path.basename(p) for p in paths]
-    job_names = [f.replace(".yaml", "") for f in file_names]
-    return job_names
-
-
-def job_path_for_slug(slug):
-    global_config = config.GlobalConfiguration()
-    job_directory = global_config.get("job_directory")
-    path = "{root_dir}/{slug}.yaml".format(root_dir=job_directory, slug=slug)
-    return path
-
-
-class JobWrapper(object):
-    def __init__(self, slug=None):
-        self.global_config = config.GlobalConfiguration()
-        self.config_path = job_path_for_slug(slug)
-
-        # Validate that we're not allowing directory traversal.
-        assert os.path.dirname(os.path.realpath(self.config_path)) == 
os.path.abspath(self.global_config.get("job_directory"))
-
-        self.config = config.JobConfiguration(self.global_config, 
self.config_path)
-
-        self.name = self.config.get("name")
-        self.slug = slug
-        self.start_time = datetime.datetime.utcnow()
-        self.mailer = mailer.Mailer(self.config)
-        if self.config.has("timeout"):
-            self.timeout = self.config.get("timeout")
-        else:
-            self.timeout = 0
-
-        if self.config.has("disabled") and self.config.get("disabled") is True:
-            self.enabled = False
-        else:
-            self.enabled = True
-
-        if not self.config.has("schedule"):
-            self.enabled = False
-
-        self.environment = os.environ.copy()
-        if self.config.has("environment"):
-            # Force all values to string
-            str_env = {k: str(v) for k, v in 
self.config.get("environment").items()}
-            self.environment.update(str_env)
-
-        command = self.config.get("command")
-        if hasattr(command, "encode"):
-            # Is stringlike, so cast to a list and handle along with the plural
-            # case below.
-            command = [command]
-        # Otherwise, it's already a list.
-
-        self.commands = command
-
-    def run(self):
-        # Check that we are the service user.
-        service_user = str(self.global_config.get("user"))
-        if service_user.isdigit():
-            passwd_entry = pwd.getpwuid(int(service_user))
-        else:
-            passwd_entry = pwd.getpwnam(service_user)
-        assert passwd_entry.pw_uid == os.getuid()
-
-        lock.begin(job_tag=self.slug)
-
-        config.log.info("Running job {name} ({slug})".format(name=self.name, 
slug=self.slug))
-
-        # Spawn timeout monitor thread.
-        if self.timeout > 0:
-            # Convert minutes to seconds.
-            timeout_seconds = self.timeout * 60
-            timer = threading.Timer(timeout_seconds, self.fail_timeout)
-            timer.start()
-
-        try:
-            for command_line in self.commands:
-                self.run_command(command_line)
-        finally:
-            lock.end()
-            if self.timeout > 0:
-                timer.cancel()
-
-    def run_command(self, command_string):
-        # TODO: Log commandline into the output log as well.
-        config.log.info("Running command: {cmd}".format(cmd=command_string))
-
-        command = shlex.split(command_string)
-
-        self.process = subprocess.Popen(command, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE, env=self.environment)
-        streamer = output_streamer.OutputStreamer(self.process, self.slug, 
self.start_time)
-        streamer.start()
-
-        # should be safe from deadlocks because our OutputStreamer
-        # has been consuming stderr and stdout
-        self.process.wait()
-
-        streamer.stop()
-
-        return_code = self.process.returncode
-        if return_code != 0:
-            self.fail_exitcode(return_code)
-
-        self.process = None
-
-    def fail_exitcode(self, return_code):
-        message = "Job {name} failed with code {code}".format(name=self.name, 
code=return_code)
-        config.log.error(message)
-        # TODO: Prevent future jobs according to config.
-        self.mailer.fail_mail(message)
-        raise JobFailure(message)
-
-    def fail_has_stderr(self, stderr_data):
-        message = "Job {name} printed things to stderr:".format(name=self.name)
-        config.log.error(message)
-        body = stderr_data.decode("utf-8")
-        config.log.error(body)
-        self.mailer.fail_mail(message, body)
-        raise JobFailure(message)
-
-    def fail_timeout(self):
-        self.process.kill()
-        message = "Job {name} timed out after {timeout} 
minutes".format(name=self.name, timeout=self.timeout)
-        config.log.error(message)
-        self.mailer.fail_mail(message)
-        # FIXME: Job will return SIGKILL now, fail_exitcode should ignore that 
signal now?
-        raise JobFailure(message)
-
-    def status(self):
-        """Check for any running instances of this job, in this process or 
another.
-
-        Returns a crappy dict, or None if no process is found.
-
-        Do not use this function to gate the workflow, explicitly assert the
-        lock instead."""
-
-        # FIXME: DRY--find a good line to cut at to split out lock.read_pid.
-        lock_path = lock.path_for_job(self.slug)
-        if os.path.exists(lock_path):
-            with open(lock_path, "r") as f:
-                pid = int(f.read().strip())
-                # TODO: encapsulate
-                return {"status": "running", "pid": pid}
-
-        return None
-
-
-class JobFailure(RuntimeError):
-    pass
diff --git a/processcontrol/runner.py b/processcontrol/runner.py
new file mode 100644
index 0000000..1b05b57
--- /dev/null
+++ b/processcontrol/runner.py
@@ -0,0 +1,114 @@
+import datetime
+import os
+import pwd
+import shlex
+import subprocess
+import threading
+
+from . import config
+from . import lock
+from . import mailer
+from . import output_streamer
+
+
+class JobRunner(object):
+    def __init__(self, job):
+        self.global_config = config.GlobalConfiguration()
+        self.job = job
+        self.mailer = mailer.Mailer(self.job.config)
+
+    def run(self):
+        # Check that we are the service user.
+        service_user = str(self.global_config.get("user"))
+        if service_user.isdigit():
+            passwd_entry = pwd.getpwuid(int(service_user))
+        else:
+            passwd_entry = pwd.getpwnam(service_user)
+        assert passwd_entry.pw_uid == os.getuid()
+
+        lock.begin(job_tag=self.job.slug)
+        self.start_time = datetime.datetime.utcnow()
+
+        config.log.info("Running job {name} 
({slug})".format(name=self.job.name, slug=self.job.slug))
+
+        # Spawn timeout monitor thread.
+        if self.job.timeout > 0:
+            # Convert minutes to seconds.
+            timeout_seconds = self.job.timeout * 60
+            timer = threading.Timer(timeout_seconds, self.fail_timeout)
+            timer.start()
+
+        try:
+            for command_line in self.job.commands:
+                self.run_command(command_line)
+        finally:
+            lock.end()
+            if self.job.timeout > 0:
+                timer.cancel()
+
+    def run_command(self, command_string):
+        # TODO: Log commandline into the output log as well.
+        config.log.info("Running command: {cmd}".format(cmd=command_string))
+
+        command = shlex.split(command_string)
+
+        self.process = subprocess.Popen(command, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE, env=self.job.environment)
+        streamer = output_streamer.OutputStreamer(self.process, self.job.slug, 
self.start_time)
+        streamer.start()
+
+        # should be safe from deadlocks because our OutputStreamer
+        # has been consuming stderr and stdout
+        self.process.wait()
+
+        streamer.stop()
+
+        return_code = self.process.returncode
+        if return_code != 0:
+            self.fail_exitcode(return_code)
+
+        self.process = None
+
+    def fail_exitcode(self, return_code):
+        message = "Job {name} failed with code 
{code}".format(name=self.job.name, code=return_code)
+        config.log.error(message)
+        # TODO: Prevent future jobs according to config.
+        self.mailer.fail_mail(message)
+        raise JobFailure(message)
+
+    def fail_has_stderr(self, stderr_data):
+        message = "Job {name} printed things to 
stderr:".format(name=self.job.name)
+        config.log.error(message)
+        body = stderr_data.decode("utf-8")
+        config.log.error(body)
+        self.mailer.fail_mail(message, body)
+        raise JobFailure(message)
+
+    def fail_timeout(self):
+        self.process.kill()
+        message = "Job {name} timed out after {timeout} 
minutes".format(name=self.job.name, timeout=self.job.timeout)
+        config.log.error(message)
+        self.mailer.fail_mail(message)
+        # FIXME: Job will return SIGKILL now, fail_exitcode should ignore that 
signal now?
+        raise JobFailure(message)
+
+    def status(self):
+        """Check for any running instances of this job, in this process or 
another.
+
+        Returns a crappy dict, or None if no process is found.
+
+        Do not use this function to gate the workflow, explicitly assert the
+        lock instead."""
+
+        # FIXME: DRY--find a good line to cut at to split out lock.read_pid.
+        lock_path = lock.path_for_job(self.job.slug)
+        if os.path.exists(lock_path):
+            with open(lock_path, "r") as f:
+                pid = int(f.read().strip())
+                # TODO: encapsulate
+                return {"status": "running", "pid": pid}
+
+        return None
+
+
+class JobFailure(RuntimeError):
+    pass
diff --git a/tests/test_job_wrapper.py b/tests/test_job_runner.py
similarity index 87%
rename from tests/test_job_wrapper.py
rename to tests/test_job_runner.py
index 31e3e9d..a82e808 100644
--- a/tests/test_job_wrapper.py
+++ b/tests/test_job_runner.py
@@ -4,7 +4,8 @@
 import os
 import testfixtures
 
-from processcontrol import job_wrapper
+from processcontrol import runner
+from processcontrol import job_spec
 
 from . import override_config
 
@@ -18,8 +19,9 @@
 
 
 def run_job(job_name):
-    job = job_wrapper.load(job_name)
-    job.run()
+    job = job_spec.load(job_name)
+    job_runner = runner.JobRunner(job)
+    job_runner.run()
 
 
 def test_success():
@@ -45,7 +47,7 @@
 @mock.patch("smtplib.SMTP")
 @testfixtures.log_capture()
 def test_return_code(MockSmtp, caplog):
-    with nose.tools.assert_raises(job_wrapper.JobFailure):
+    with nose.tools.assert_raises(runner.JobFailure):
         run_job("return_code")
 
     loglines = caplog.actual()
@@ -59,7 +61,7 @@
 @mock.patch("smtplib.SMTP")
 @testfixtures.log_capture()
 def test_timeout(MockSmtp, caplog):
-    with nose.tools.assert_raises(job_wrapper.JobFailure):
+    with nose.tools.assert_raises(runner.JobFailure):
         run_job("timeout")
 
     loglines = caplog.actual()
@@ -72,7 +74,7 @@
 @mock.patch("smtplib.SMTP")
 @testfixtures.log_capture()
 def test_stderr(MockSmtp, caplog):
-    with nose.tools.assert_raises(job_wrapper.JobFailure):
+    with nose.tools.assert_raises(runner.JobFailure):
         run_job("errors")
 
     loglines = list(caplog.actual())

-- 
To view, visit https://gerrit.wikimedia.org/r/346574
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I44e976f46c7159e02c431c470e74e54f8ad12229
Gerrit-PatchSet: 7
Gerrit-Project: wikimedia/fundraising/process-control
Gerrit-Branch: master
Gerrit-Owner: Awight <awi...@wikimedia.org>
Gerrit-Reviewer: Cdentinger <cdentin...@wikimedia.org>
Gerrit-Reviewer: Ejegg <eeggles...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to