Ejegg has uploaded a new change for review. ( 
https://gerrit.wikimedia.org/r/345424 )

Change subject: WIP stream subprocess stdout to log file
......................................................................

WIP stream subprocess stdout to log file

Change-Id: Icf9fec6ced84b3107511a8ccc4c905076ad1ffd6
---
M processcontrol/job_wrapper.py
A processcontrol/output_streamer.py
2 files changed, 56 insertions(+), 36 deletions(-)


  git pull 
ssh://gerrit.wikimedia.org:29418/wikimedia/fundraising/process-control 
refs/changes/24/345424/1

diff --git a/processcontrol/job_wrapper.py b/processcontrol/job_wrapper.py
index 321798b..2b3fa2e 100644
--- a/processcontrol/job_wrapper.py
+++ b/processcontrol/job_wrapper.py
@@ -8,6 +8,7 @@
 from . import config
 from . import lock
 from . import mailer
+from . import output_streamer
 
 
 # TODO: uh has no raison d'etre now other than to demonstrate factoryness.
@@ -64,6 +65,9 @@
         command = shlex.split(self.config.get("command"))
 
         self.process = subprocess.Popen(command, stdout=subprocess.PIPE, 
stderr=subprocess.PIPE, env=self.environment)
+        streamer = output_streamer.OutputStreamer(self.process, self.slug, 
self.start_time)
+        streamer.start()
+
         timer = threading.Timer(self.timeout, self.fail_timeout)
         timer.start()
 
@@ -71,7 +75,7 @@
             # FIXME: This doesn't stream, so large output will be buffered in 
memory.
             (stdout_data, stderr_data) = self.process.communicate()
 
-            self.store_job_output(stdout_data, stderr_data)
+#            self.store_job_output(stdout_data, stderr_data)
 
             if len(stderr_data) > 0:
                 self.fail_has_stderr(stderr_data)
@@ -102,41 +106,6 @@
         config.log.error(message)
         self.mailer.fail_mail(message)
         # FIXME: Job will return SIGKILL now, fail_exitcode should ignore that 
signal now?
-
-    def store_job_output(self, stdout_data, stderr_data):
-        output_directory = self.global_config.get("output_directory")
-        assert os.access(output_directory, os.W_OK)
-
-        # per-job directory
-        job_directory = output_directory + "/" + self.name
-        if not os.path.exists(job_directory):
-            os.makedirs(job_directory)
-
-        timestamp = self.start_time.strftime("%Y%m%d-%H%M%S")
-        filename = 
"{logdir}/{name}-{timestamp}.log".format(logdir=job_directory, name=self.name, 
timestamp=timestamp)
-        with open(filename, "a") as out:
-            header = (
-                "===========\n"
-                "{name} ({pid}), started at {time}\n"
-                "-----------\n"
-            ).format(name=self.name, pid=self.process.pid, 
time=self.start_time.isoformat())
-            out.write(header)
-
-            if len(stdout_data) == 0:
-                buf = "* No output *\n"
-            else:
-                buf = stdout_data.decode("utf-8")
-            out.write(buf)
-
-            if len(stderr_data) > 0:
-                header = (
-                    "-----------\n"
-                    "Even worse, the job emitted errors:\n"
-                    "-----------\n"
-                )
-                out.write(header)
-
-                out.write(stderr_data.decode("utf-8"))
 
     def status(self):
         """Check for any running instances of this job, in this process or 
another.
diff --git a/processcontrol/output_streamer.py 
b/processcontrol/output_streamer.py
new file mode 100644
index 0000000..e09f3d0
--- /dev/null
+++ b/processcontrol/output_streamer.py
@@ -0,0 +1,51 @@
+import logging.config
+import os
+import threading
+
+from . import config
+
+class OutputStreamer(object):
+
+    def __init__(self, process, slug, start_time):
+        self.stream = process.stdout
+        self.pid = process.pid
+        self.slug = slug
+        self.start_time = start_time
+
+    def start(self):
+        self.create_logger()
+        out_thread = threading.Thread(target=self.read_output)
+        out_thread.daemon = True
+        out_thread.start()
+
+    def read_output(self):
+        while True:
+            line = self.stream.readline()
+            if line == '':
+                break;
+            self.logger.info(line)
+
+    def create_logger(self):
+        output_directory = config.GlobalConfiguration().get("output_directory")
+        assert os.access(output_directory, os.W_OK)
+
+        # per-job directory
+        job_directory = output_directory + "/" + self.slug
+        if not os.path.exists(job_directory):
+            os.makedirs(job_directory)
+
+        timestamp = self.start_time.strftime("%Y%m%d-%H%M%S")
+        filename = 
"{logdir}/{name}-{timestamp}.log".format(logdir=job_directory, name=self.slug, 
timestamp=timestamp)
+
+        self.logger = logging.getLogger(self.slug)
+        self.logger.setLevel(logging.INFO)
+        
+        fh = logging.FileHandler(filename)
+        formatter = logging.Formatter("%(asctime)s - %(message)s")
+        fh.setFormatter(formatter)
+
+        self.logger.addHandler(fh)
+    
+        self.logger.info("===========")
+        self.logger.info("{name} ({pid})".format(name=self.slug, pid=self.pid))
+        self.logger.info("-----------")

-- 
To view, visit https://gerrit.wikimedia.org/r/345424
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Icf9fec6ced84b3107511a8ccc4c905076ad1ffd6
Gerrit-PatchSet: 1
Gerrit-Project: wikimedia/fundraising/process-control
Gerrit-Branch: master
Gerrit-Owner: Ejegg <eeggles...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to