ArielGlenn has uploaded a new change for review. https://gerrit.wikimedia.org/r/306201
Change subject: scheduler: move cache related methods into a class ...................................................................... scheduler: move cache related methods into a class Change-Id: I3efa0a4e6130af160be2863af37464160e6a927a --- M xmldumps-backup/dumpscheduler.py 1 file changed, 74 insertions(+), 60 deletions(-) git pull ssh://gerrit.wikimedia.org:29418/operations/dumps refs/changes/01/306201/1 diff --git a/xmldumps-backup/dumpscheduler.py b/xmldumps-backup/dumpscheduler.py index 3d29d4b..c16fc9b 100644 --- a/xmldumps-backup/dumpscheduler.py +++ b/xmldumps-backup/dumpscheduler.py @@ -72,6 +72,70 @@ return converted +class Cacher(object): + ''' + save and restore from command cache + ''' + def __init__(self, cachepath, my_id, restore, rerun): + self.cachepath = cachepath + self.my_id = my_id + self.restore = restore + self.rerun = rerun + + def save_to_cache(self, commands): + ''' + write a cache file recording all commands in series + and their state, in case this script meets an untimely demise + ''' + + if self.cachepath is None: + return + cache_p = open(self.cachepath + ".tmp", "w+") + cache_p.write("id:%s\n" % self.my_id) + for entry in commands: + # dump if there are processes running or processes yet + # to run for command set + if entry['processids'] or entry['count'] > 0: + cache_p.write(json.dumps(entry, default=json_obj_dump) + "\n") + cache_p.close() + os.rename(self.cachepath + ".tmp", self.cachepath) + + def restore_from_cache(self): + ''' + if this script has been interrupted, restore command state information + from a cache file, including the value of the variable set in the + environment of all commands previously started by this script + + this is necessary because we won't have the Popen object for those + processes now, if they are still running. We will only have the pid + which could have been re-used. We check the environ for a variable + with that value to make sure it was really our process. + ''' + + commands = [] + if self.cachepath is None or not self.restore or not os.path.exists(self.cachepath): + return commands + + cache_p = open(self.cachepath, "r") + for line in cache_p: + line = line.rstrip("\n") + if line.startswith("id:"): + self.my_id = line.split(":", 1)[1] + continue + + entry = json.loads(line) + entry['slots'] = int(entry['slots']) + entry['count'] = int(entry['count']) + entry['done'] = int(entry['done']) + entry['procidsfromcache'] = entry['processids'][:] + entry['processes'] = [] + commands.append(entry) + if self.rerun: + for entry in commands: + entry['rerun'] = True + return commands + + class Scheduler(object): ''' handle running a sequence of commands, each command possibly to @@ -94,16 +158,14 @@ self.total_slots = slots self.commands = [] self.free_slots = slots - self.cache = cache self.mailhost = mailhost - self.restore = restore - self.rerun = rerun self.pid = os.getpid() self.my_id = "%s%d%s" % (time.strftime("%Y%m%d%H%M%S", time.gmtime()), self.pid, os.geteuid()) self.my_prefix = 'PYMGR_ID' self.email_from = email_from self.formatvars = format_convert(formatvars) + self.cacher = Cacher(cache, self.my_id, restore, rerun) def handle_hup(self, signo_unused, frame_unused): """ @@ -147,18 +209,14 @@ free slots (e.g. cpu resources) are available. ''' - if self.cache and self.restore: - self.restore_from_cache() - if self.rerun: - for entry in self.commands: - entry['rerun'] = True - else: - self.read_commands() + self.commands = self.cacher.restore_from_cache() + if not self.commands: + self.commands = self.read_commands() os.environ[self.my_prefix] = self.my_id while True: if self.start_command() is None: - self.save_to_cache() + self.cacher.save_to_cache(self.commands) LOG.info("all command sets completed.") break # do we want this configurable? meh @@ -170,15 +228,17 @@ read text entries describing each set of commands to be run ''' + commands = [] for line in self.input: line = line.rstrip('\n') if line.startswith('#') or line.startswith(" ") or not line: continue if self.formatvars is not None: line = line.format(**self.formatvars) - self.commands.append(line_to_entry(line, self.total_slots)) + commands.append(line_to_entry(line, self.total_slots)) if self.input != sys.stdin: self.input.close() + return commands def check_pid(self, pid): ''' @@ -353,7 +413,7 @@ else: self.mark_process_done(None, pid, entry) if will_exit: - self.save_to_cache() + self.cacher.save_to_cache(self.commands) LOG.error("exiting after command failure") sys.exit(1) @@ -395,54 +455,8 @@ entry['processids'].append(process.pid) self.free_slots = self.free_slots - entry['slots'] entry['count'] -= 1 - self.save_to_cache() + self.cacher.save_to_cache(self.commands) return True - - def save_to_cache(self): - ''' - write a cache file recording all commands in series - and their state, in case this script meets an untimely demise - ''' - - cache_p = open(self.cache + ".tmp", "w+") - cache_p.write("id:%s\n" % self.my_id) - for entry in self.commands: - # dump if there are processes running or processes yet - # to run for command set - if entry['processids'] or entry['count'] > 0: - cache_p.write(json.dumps(entry, default=json_obj_dump) + "\n") - cache_p.close() - os.rename(self.cache + ".tmp", self.cache) - - def restore_from_cache(self): - ''' - if this script has been interrupted, restore command state information - from a cache file, including the value of the variable set in the - environment of all commands previously started by this script - - this is necessary because we won't have the Popen object for those - processes now, if they are still running. We will only have the pid - which could have been re-used. We check the environ for a variable - with that value to make sure it was really our process. - ''' - - if not os.path.exists(self.cache): - return - - cache_p = open(self.cache, "r") - for line in cache_p: - line = line.rstrip("\n") - if line.startswith("id:"): - self.my_id = line.split(":", 1)[1] - continue - - entry = json.loads(line) - entry['slots'] = int(entry['slots']) - entry['count'] = int(entry['count']) - entry['done'] = int(entry['done']) - entry['procidsfromcache'] = entry['processids'][:] - entry['processes'] = [] - self.commands.append(entry) def usage(message=None): -- To view, visit https://gerrit.wikimedia.org/r/306201 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I3efa0a4e6130af160be2863af37464160e6a927a Gerrit-PatchSet: 1 Gerrit-Project: operations/dumps Gerrit-Branch: master Gerrit-Owner: ArielGlenn <ar...@wikimedia.org> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits