ArielGlenn has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/306201

Change subject: scheduler: move cache related methods into a class
......................................................................

scheduler: move cache related methods into a class

Change-Id: I3efa0a4e6130af160be2863af37464160e6a927a
---
M xmldumps-backup/dumpscheduler.py
1 file changed, 74 insertions(+), 60 deletions(-)


  git pull ssh://gerrit.wikimedia.org:29418/operations/dumps 
refs/changes/01/306201/1

diff --git a/xmldumps-backup/dumpscheduler.py b/xmldumps-backup/dumpscheduler.py
index 3d29d4b..c16fc9b 100644
--- a/xmldumps-backup/dumpscheduler.py
+++ b/xmldumps-backup/dumpscheduler.py
@@ -72,6 +72,70 @@
     return converted
 
 
+class Cacher(object):
+    '''
+    save and restore from command cache
+    '''
+    def __init__(self, cachepath, my_id, restore, rerun):
+        self.cachepath = cachepath
+        self.my_id = my_id
+        self.restore = restore
+        self.rerun = rerun
+
+    def save_to_cache(self, commands):
+        '''
+        write a cache file recording all commands in series
+        and their state, in case this script meets an untimely demise
+        '''
+
+        if self.cachepath is None:
+            return
+        cache_p = open(self.cachepath + ".tmp", "w+")
+        cache_p.write("id:%s\n" % self.my_id)
+        for entry in commands:
+            # dump if there are processes running or processes yet
+            # to run for command set
+            if entry['processids'] or entry['count'] > 0:
+                cache_p.write(json.dumps(entry, default=json_obj_dump) + "\n")
+        cache_p.close()
+        os.rename(self.cachepath + ".tmp", self.cachepath)
+
+    def restore_from_cache(self):
+        '''
+        if this script has been interrupted, restore command state information
+        from a cache file, including the value of the variable set in the
+        environment of all commands previously started by this script
+
+        this is necessary because we won't have the Popen object for those
+        processes now, if they are still running. We will only have the pid
+        which could have been re-used. We check the environ for a variable
+        with that value to make sure it was really our process.
+        '''
+
+        commands = []
+        if self.cachepath is None or not self.restore or not 
os.path.exists(self.cachepath):
+            return commands
+
+        cache_p = open(self.cachepath, "r")
+        for line in cache_p:
+            line = line.rstrip("\n")
+            if line.startswith("id:"):
+                self.my_id = line.split(":", 1)[1]
+                continue
+
+            entry = json.loads(line)
+            entry['slots'] = int(entry['slots'])
+            entry['count'] = int(entry['count'])
+            entry['done'] = int(entry['done'])
+            entry['procidsfromcache'] = entry['processids'][:]
+            entry['processes'] = []
+            commands.append(entry)
+        if self.rerun:
+            for entry in commands:
+                entry['rerun'] = True
+        return commands
+
+
 class Scheduler(object):
     '''
     handle running a sequence of commands, each command possibly to
@@ -94,16 +158,14 @@
         self.total_slots = slots
         self.commands = []
         self.free_slots = slots
-        self.cache = cache
         self.mailhost = mailhost
-        self.restore = restore
-        self.rerun = rerun
         self.pid = os.getpid()
         self.my_id = "%s%d%s" % (time.strftime("%Y%m%d%H%M%S", time.gmtime()),
                                  self.pid, os.geteuid())
         self.my_prefix = 'PYMGR_ID'
         self.email_from = email_from
         self.formatvars = format_convert(formatvars)
+        self.cacher = Cacher(cache, self.my_id, restore, rerun)
 
     def handle_hup(self, signo_unused, frame_unused):
         """
@@ -147,18 +209,14 @@
         free slots (e.g. cpu resources) are available.
         '''
 
-        if self.cache and self.restore:
-            self.restore_from_cache()
-            if self.rerun:
-                for entry in self.commands:
-                    entry['rerun'] = True
-        else:
-            self.read_commands()
+        self.commands = self.cacher.restore_from_cache()
+        if not self.commands:
+            self.commands = self.read_commands()
         os.environ[self.my_prefix] = self.my_id
 
         while True:
             if self.start_command() is None:
-                self.save_to_cache()
+                self.cacher.save_to_cache(self.commands)
                 LOG.info("all command sets completed.")
                 break
             # do we want this configurable? meh
@@ -170,15 +228,17 @@
         read text entries describing each set of commands to be run
         '''
 
+        commands = []
         for line in self.input:
             line = line.rstrip('\n')
             if line.startswith('#') or line.startswith(" ") or not line:
                 continue
             if self.formatvars is not None:
                 line = line.format(**self.formatvars)
-            self.commands.append(line_to_entry(line, self.total_slots))
+            commands.append(line_to_entry(line, self.total_slots))
         if self.input != sys.stdin:
             self.input.close()
+        return commands
 
     def check_pid(self, pid):
         '''
@@ -353,7 +413,7 @@
                     else:
                         self.mark_process_done(None, pid, entry)
         if will_exit:
-            self.save_to_cache()
+            self.cacher.save_to_cache(self.commands)
             LOG.error("exiting after command failure")
             sys.exit(1)
 
@@ -395,54 +455,8 @@
             entry['processids'].append(process.pid)
             self.free_slots = self.free_slots - entry['slots']
             entry['count'] -= 1
-            self.save_to_cache()
+            self.cacher.save_to_cache(self.commands)
         return True
-
-    def save_to_cache(self):
-        '''
-        write a cache file recording all commands in series
-        and their state, in case this script meets an untimely demise
-        '''
-
-        cache_p = open(self.cache + ".tmp", "w+")
-        cache_p.write("id:%s\n" % self.my_id)
-        for entry in self.commands:
-            # dump if there are processes running or processes yet
-            # to run for command set
-            if entry['processids'] or entry['count'] > 0:
-                cache_p.write(json.dumps(entry, default=json_obj_dump) + "\n")
-        cache_p.close()
-        os.rename(self.cache + ".tmp", self.cache)
-
-    def restore_from_cache(self):
-        '''
-        if this script has been interrupted, restore command state information
-        from a cache file, including the value of the variable set in the
-        environment of all commands previously started by this script
-
-        this is necessary because we won't have the Popen object for those
-        processes now, if they are still running. We will only have the pid
-        which could have been re-used. We check the environ for a variable
-        with that value to make sure it was really our process.
-        '''
-
-        if not os.path.exists(self.cache):
-            return
-
-        cache_p = open(self.cache, "r")
-        for line in cache_p:
-            line = line.rstrip("\n")
-            if line.startswith("id:"):
-                self.my_id = line.split(":", 1)[1]
-                continue
-
-            entry = json.loads(line)
-            entry['slots'] = int(entry['slots'])
-            entry['count'] = int(entry['count'])
-            entry['done'] = int(entry['done'])
-            entry['procidsfromcache'] = entry['processids'][:]
-            entry['processes'] = []
-            self.commands.append(entry)
 
 
 def usage(message=None):

-- 
To view, visit https://gerrit.wikimedia.org/r/306201
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I3efa0a4e6130af160be2863af37464160e6a927a
Gerrit-PatchSet: 1
Gerrit-Project: operations/dumps
Gerrit-Branch: master
Gerrit-Owner: ArielGlenn <ar...@wikimedia.org>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to