ArielGlenn has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/343539 )
Change subject: move xml content dump jobs out to separate module ...................................................................... move xml content dump jobs out to separate module Change-Id: I12fce108339a4a5af48037827f69b0b2aadecd25 --- M xmldumps-backup/dumps/recombinejobs.py M xmldumps-backup/dumps/recompressjobs.py M xmldumps-backup/dumps/runner.py A xmldumps-backup/dumps/xmlcontentjobs.py M xmldumps-backup/dumps/xmljobs.py 5 files changed, 774 insertions(+), 764 deletions(-) Approvals: ArielGlenn: Looks good to me, approved jenkins-bot: Verified diff --git a/xmldumps-backup/dumps/recombinejobs.py b/xmldumps-backup/dumps/recombinejobs.py index b634b12..f071cd7 100644 --- a/xmldumps-backup/dumps/recombinejobs.py +++ b/xmldumps-backup/dumps/recombinejobs.py @@ -6,7 +6,7 @@ from os.path import exists from dumps.exceptions import BackupError from dumps.jobs import Dump -from dumps.xmljobs import XmlDump +from dumps.xmlcontentjobs import XmlDump class RecombineXmlStub(Dump): diff --git a/xmldumps-backup/dumps/recompressjobs.py b/xmldumps-backup/dumps/recompressjobs.py index 142072f..f966f05 100644 --- a/xmldumps-backup/dumps/recompressjobs.py +++ b/xmldumps-backup/dumps/recompressjobs.py @@ -7,7 +7,7 @@ from dumps.exceptions import BackupError from dumps.fileutils import DumpFilename from dumps.jobs import Dump -from dumps.xmljobs import XmlDump +from dumps.xmlcontentjobs import XmlDump class XmlMultiStreamDump(XmlDump): diff --git a/xmldumps-backup/dumps/runner.py b/xmldumps-backup/dumps/runner.py index a681a0e..9acf564 100644 --- a/xmldumps-backup/dumps/runner.py +++ b/xmldumps-backup/dumps/runner.py @@ -14,7 +14,8 @@ from dumps.tablesjobs import PrivateTable, PublicTable, TitleDump, AllTitleDump from dumps.recombinejobs import RecombineAbstractDump, RecombineXmlDump from dumps.recombinejobs import RecombineXmlStub, RecombineXmlRecompressDump -from dumps.xmljobs import XmlDump, XmlLogging, XmlStub, BigXmlDump, AbstractDump +from dumps.xmljobs import XmlLogging, XmlStub, AbstractDump +from dumps.xmlcontentjobs import XmlDump, BigXmlDump from dumps.recompressjobs import XmlMultiStreamDump, XmlRecompressDump from dumps.flowjob import FlowDump diff --git a/xmldumps-backup/dumps/xmlcontentjobs.py b/xmldumps-backup/dumps/xmlcontentjobs.py new file mode 100644 index 0000000..d6e4cce --- /dev/null +++ b/xmldumps-backup/dumps/xmlcontentjobs.py @@ -0,0 +1,768 @@ +''' +All xml content dump jobs are defined here +''' + +import re +import os +from os.path import exists +import signal + +from dumps.CommandManagement import CommandPipeline +from dumps.exceptions import BackupError +from dumps.fileutils import DumpFile, DumpFilename +from dumps.utils import MultiVersion, MiscUtils +from dumps.jobs import Dump +from dumps.WikiDump import Locker + + +class XmlDump(Dump): + """Primary XML dumps, one section at a time.""" + def __init__(self, subset, name, desc, detail, item_for_stubs, prefetch, + prefetchdate, spawn, + wiki, partnum_todo, parts=False, checkpoints=False, checkpoint_file=None, + page_id_range=None, verbose=False): + self._subset = subset + self._detail = detail + self._desc = desc + self._prefetch = prefetch + self._prefetchdate = prefetchdate + self._spawn = spawn + self._parts = parts + if self._parts: + self._parts_enabled = True + self.onlyparts = True + self._page_id = {} + self._partnum_todo = partnum_todo + + self.wiki = wiki + self.item_for_stubs = item_for_stubs + if checkpoints: + self._checkpoints_enabled = True + self.checkpoint_file = checkpoint_file + self.page_id_range = page_id_range + self.verbose = verbose + self._prerequisite_items = [self.item_for_stubs] + self._check_truncation = True + Dump.__init__(self, name, desc, self.verbose) + + def get_dumpname_base(self): + return 'pages-' + + def get_dumpname(self): + return self.get_dumpname_base() + self._subset + + def get_filetype(self): + return "xml" + + def get_file_ext(self): + return "bz2" + + def get_stub_files(self, runner, partnum=None): + ''' + get the stub files pertaining to our dumpname, which is *one* of + articles, pages-current, pages-history. + stubs include all of these together. + we will either return the one full stubs file that exists + or the one stub file part, if we are (re)running a specific + file part (subjob), or all file parts if we are (re)running + the entire job which is configured for subjobs. + + arguments: + runner - Runner object + partnum (int) - number of file part (subjob) if any + ''' + if partnum is None: + partnum = self._partnum_todo + if not self.dumpname.startswith(self.get_dumpname_base()): + raise BackupError("dumpname %s of unknown form for this job" % self.dumpname) + + dumpname = self.dumpname[len(self.get_dumpname_base()):] + stub_dumpnames = self.item_for_stubs.list_dumpnames() + for sname in stub_dumpnames: + if sname.endswith(dumpname): + stub_dumpname = sname + input_files = self.item_for_stubs.list_outfiles_for_input(runner.dump_dir, [stub_dumpname]) + if self._parts_enabled: + if partnum is not None: + for inp_file in input_files: + if inp_file.partnum_int == partnum: + input_files = [inp_file] + break + return input_files + + def get_chkptfile_from_pageids(self): + if ',' in self.page_id_range: + first_page_id, last_page_id = self.page_id_range.split(',', 1) + else: + first_page_id = self.page_id_range + last_page_id = "00000" # indicates no last page id specified, go to end of stub + checkpoint_string = DumpFilename.make_checkpoint_string(first_page_id, last_page_id) + if self._partnum_todo: + partnum = self._partnum_todo + else: + # fixme is that right? maybe NOT + partnum = None + fileobj = DumpFilename(self.get_dumpname(), self.wiki.date, self.get_filetype(), + self.get_file_ext(), partnum, checkpoint_string) + return fileobj.filename + + def get_missing_before(self, needed_range, have_range): + # given range of numbers needed and range of numbers we have, + # return range of numbers needed before first number we have, + # or None if none + if have_range is None: + return needed_range + elif needed_range is None or int(have_range[0]) <= int(needed_range[0]): + return None + else: + return (needed_range[0], str(int(have_range[0]) - 1), needed_range[2]) + + def find_missing_ranges(self, needed, have): + # given list tuples of ranges of numbers needed, and ranges of numbers we have, + # determine the ranges of numbers missing and return list of said tuples + needed_index = 0 + have_index = 0 + missing = [] + + if not needed: + return missing + if not have: + return needed + + needed_range = needed[needed_index] + have_range = have[have_index] + + while True: + # if we're out of haves, append everything we need + if have_range is None: + missing.append(needed_range) + needed_index += 1 + if needed_index < len(needed): + needed_range = needed[needed_index] + else: + # end of needed. done + return missing + + before_have = self.get_missing_before(needed_range, have_range) + + # write anything we don't have + if before_have is not None: + missing.append(before_have) + + # if we haven't already exhausted all the ranges we have... + if have_range is not None: + # skip over the current range of what we have + skip_up_to = str(int(have_range[1]) + 1) + while int(needed_range[1]) < int(skip_up_to): + needed_index += 1 + if needed_index < len(needed): + needed_range = needed[needed_index] + else: + # end of needed. done + return missing + + if int(needed_range[0]) < int(skip_up_to): + needed_range = (skip_up_to, needed_range[1], needed_range[2]) + + # get the next range we have + have_index += 1 + if have_index < len(have): + have_range = have[have_index] + else: + have_range = None + + return missing + + def chkpt_file_from_page_range(self, page_range, partnum): + checkpoint_string = DumpFilename.make_checkpoint_string( + page_range[0], page_range[1]) + output_file = DumpFilename(self.wiki, self.wiki.date, self.dumpname, + self.get_filetype(), self.get_file_ext(), + partnum, checkpoint=checkpoint_string, + temp=False) + return output_file + + def chkptfile_in_pagerange(self, fobj, chkpt_fobj): + """return False if both files are checkpoint files (with page ranges) + and the second file page range does not overlap with the first one""" + # not both checkpoint files: + if not fobj.is_checkpoint_file or not chkpt_fobj.is_checkpoint_file: + return True + # one or both end values are missing: + if not fobj.last_page_id and not chkpt_fobj.last_page_id: + return True + elif not fobj.last_page_id and int(chkpt_fobj.last_page_id) < int(fobj.first_page_id): + return True + elif not chkpt_fobj.last_page_id and int(fobj.last_page_id) < int(chkpt_fobj.first_page_id): + return True + # have end values for both files: + elif (int(fobj.first_page_id) <= int(chkpt_fobj.first_page_id) and + int(chkpt_fobj.first_page_id) <= int(fobj.last_page_id)): + return True + elif (int(chkpt_fobj.first_page_id) <= int(fobj.first_page_id) and + int(fobj.first_page_id) <= int(chkpt_fobj.last_page_id)): + return True + else: + return False + + def cleanup_tmp_files(self, dump_dir, runner): + """ + with checkpoint files turned on, this job writes output + to <something>.xml<-maybemorestuff>.bz2-tmp + and if those files are lying around after such a job dies, + we should clean them up + """ + if "cleanup_tmp_files" not in runner.enabled: + return + + # if we don't have the lock it's possible some + # other process is writing tmp files, don't touch + locker = Locker(self.wiki, self.wiki.date) + lockfiles = locker.is_locked() + if not lockfiles: + return + if len(lockfiles) > 1: + # more than one process with the lock? should not + # be possible, but if it is... touch nothing! + return + if not locker.check_owner(lockfiles[0], str(os.getpid())): + return + + to_delete = self.get_tmp_files(dump_dir) + for finfo in to_delete: + if exists(dump_dir.filename_public_path(finfo)): + os.remove(dump_dir.filename_public_path(finfo)) + elif exists(dump_dir.filename_private_path(finfo)): + os.remove(dump_dir.filename_private_path(finfo)) + + def run(self, runner): + # here we will either clean up or not depending on how we were called + # FIXME callers should set this appropriately and they don't right now + self.cleanup_old_files(runner.dump_dir, runner) + + # clean up all tmp output files from previous attempts of this job + # for this dump wiki and date, otherwise we'll wind up indexing + # them and hashsumming them etc. + # they may have been left around from an interrupted or failed earlier + # run + self.cleanup_tmp_files(runner.dump_dir, runner) + + commands = [] + + todo = [] + + if self.page_id_range is not None: + # convert to checkpoint filename, handle the same way + self.checkpoint_file = self.get_chkptfile_from_pageids() + + if self.checkpoint_file: + todo = [self.checkpoint_file] + else: + # list all the output files that would be produced w/o + # checkpoint files on + outfiles = self.get_reg_files_for_filepart_possible( + runner.dump_dir, self.get_fileparts_list(), self.list_dumpnames()) + if self._checkpoints_enabled: + # get the stub list that would be used for the current run + stubs = self.get_stub_files(runner) + stubs = sorted(stubs, key=lambda thing: thing.filename) + + # get the page ranges covered by stubs + stub_ranges = [] + for stub in stubs: + fname = DumpFile(self.wiki, + runner.dump_dir.filename_public_path(stub, stub.date), + stub, self.verbose) + stub_ranges.append((fname.find_first_page_id_in_file(), + self.find_last_page_id(stub, runner), stub.partnum)) + + # get list of existing checkpoint files + chkpt_files = self.list_checkpt_files( + runner.dump_dir, [self.dumpname], runner.wiki.date, parts=None) + chkpt_files = sorted(chkpt_files, key=lambda thing: thing.filename) + # get the page ranges covered by existing checkpoint files + checkpoint_ranges = [(chkptfile.first_page_id, chkptfile.last_page_id, + chkptfile.partnum) + for chkptfile in chkpt_files] + if self.verbose: + print "checkpoint_ranges is", checkpoint_ranges + print "stub_ranges is", stub_ranges + + if not checkpoint_ranges: + # no page ranges covered by checkpoints. do all output files + # the usual way + todo = outfiles + else: + todo = [] + missing_ranges = self.find_missing_ranges(stub_ranges, checkpoint_ranges) + parts = self.get_fileparts_list() + for partnum in parts: + if not [1 for chkpt_range in checkpoint_ranges + if int(chkpt_range[2]) == partnum]: + # entire page range for a particular file part (subjob) + # is missing so generate the regular output file + todo.extend([outfile for outfile in outfiles + if int(outfile.partnum) == partnum]) + else: + # at least some page ranges are covered, just do those that + # are missing (maybe none are and list is empty) + todo.extend([self.chkpt_file_from_page_range((first, last), part) + for (first, last, part) in missing_ranges + if int(part) == partnum]) + else: + # do the missing files only + todo = [outfile for outfile in outfiles + if not os.path.exists(runner.dump_dir.filename_public_path(outfile))] + + partial_stubs = [] + if self.verbose: + print "todo is", [to.filename for to in todo] + + for fileobj in todo: + + stub_for_file = self.get_stub_files(runner, fileobj.partnum_int)[0] + + if fileobj.first_page_id is None: + partial_stubs.append(stub_for_file) + else: + stub_output_file = DumpFilename( + self.wiki, fileobj.date, fileobj.dumpname, + self.item_for_stubs.get_filetype(), + self.item_for_stubs.get_file_ext(), + fileobj.partnum, + DumpFilename.make_checkpoint_string( + fileobj.first_page_id, fileobj.last_page_id), temp=True) + + self.write_partial_stub(stub_for_file, stub_output_file, runner) + if not self.has_no_entries(stub_output_file, runner): + partial_stubs.append(stub_output_file) + + if self.verbose: + print "partial_stubs is", [ps.filename for ps in partial_stubs] + if partial_stubs: + stub_files = partial_stubs + else: + return + + for stub_file in stub_files: + series = self.build_command(runner, stub_file) + commands.append(series) + + error = runner.run_command(commands, callback_stderr=self.progress_callback, + callback_stderr_arg=runner) + if error: + raise BackupError("error producing xml file(s) %s" % self.dumpname) + + def has_no_entries(self, xmlfile, runner): + ''' + see if it has a page id in it or not. no? then return True + ''' + if xmlfile.is_temp_file: + path = os.path.join(self.wiki.config.temp_dir, xmlfile.filename) + else: + path = runner.dump_dir.filename_public_path(xmlfile, self.wiki.date) + fname = DumpFile(self.wiki, path, xmlfile, self.verbose) + return bool(fname.find_first_page_id_in_file() is None) + + def build_eta(self, runner): + """Tell the dumper script whether to make ETA estimate on page or revision count.""" + return "--current" + + # takes name of the output file + def build_filters(self, runner, inp_file): + """Construct the output filter options for dumpTextPass.php""" + # do we need checkpoints? ummm + xmlbz2 = runner.dump_dir.filename_public_path(inp_file) + + if not exists(self.wiki.config.bzip2): + raise BackupError("bzip2 command %s not found" % self.wiki.config.bzip2) + if self.wiki.config.bzip2[-6:] == "dbzip2": + bz2mode = "dbzip2" + else: + bz2mode = "bzip2" + return "--output=%s:%s" % (bz2mode, xmlbz2) + + def write_partial_stub(self, input_file, output_file, runner): + if not exists(self.wiki.config.writeuptopageid): + raise BackupError("writeuptopageid command %s not found" % + self.wiki.config.writeuptopageid) + + inputfile_path = runner.dump_dir.filename_public_path(input_file) + output_file_path = os.path.join(self.wiki.config.temp_dir, output_file.filename) + if input_file.file_ext == "gz": + command1 = "%s -dc %s" % (self.wiki.config.gzip, inputfile_path) + command2 = "%s > %s" % (self.wiki.config.gzip, output_file_path) + elif input_file.file_ext == '7z': + command1 = "%s e -si %s" % (self.wiki.config.sevenzip, inputfile_path) + command2 = "%s e -so %s" % (self.wiki.config.sevenzip, output_file_path) + elif input_file.file_ext == 'bz': + command1 = "%s -dc %s" % (self.wiki.config.bzip2, inputfile_path) + command2 = "%s > %s" % (self.wiki.config.bzip2, output_file_path) + else: + raise BackupError("unknown stub file extension %s" % input_file.file_ext) + if output_file.last_page_id is not None and output_file.last_page_id is not "00000": + command = [command1 + ("| %s %s %s |" % (self.wiki.config.writeuptopageid, + output_file.first_page_id, + output_file.last_page_id)) + command2] + else: + # no lastpageid? read up to eof of the specific stub file that's used for input + command = [command1 + ("| %s %s |" % (self.wiki.config.writeuptopageid, + output_file.first_page_id)) + command2] + + pipeline = [command] + series = [pipeline] + error = runner.run_command([series], shell=True) + if error: + raise BackupError("failed to write partial stub file %s" % output_file.filename) + + def get_last_lines_from_n(self, fileobj, runner, count): + if not fileobj.filename or not exists(runner.dump_dir.filename_public_path(fileobj)): + return None + + dumpfile = DumpFile(self.wiki, + runner.dump_dir.filename_public_path(fileobj, self.wiki.date), + fileobj, self.verbose) + pipeline = dumpfile.setup_uncompression_command() + + tail = self.wiki.config.tail + if not exists(tail): + raise BackupError("tail command %s not found" % tail) + tail_esc = MiscUtils.shell_escape(tail) + pipeline.append([tail, "-n", "+%s" % count]) + # without shell + proc = CommandPipeline(pipeline, quiet=True) + proc.run_pipeline_get_output() + last_lines = "" + if (proc.exited_successfully() or + (proc.get_failed_cmds_with_retcode() == + [[-signal.SIGPIPE, pipeline[0]]]) or + (proc.get_failed_cmds_with_retcode() == + [[signal.SIGPIPE + 128, pipeline[0]]])): + last_lines = proc.output() + return last_lines + + def get_lineno_last_page(self, fileobj, runner): + if not fileobj.filename or not exists(runner.dump_dir.filename_public_path(fileobj)): + return None + dumpfile = DumpFile(self.wiki, + runner.dump_dir.filename_public_path(fileobj, self.wiki.date), + fileobj, self.verbose) + pipeline = dumpfile.setup_uncompression_command() + grep = self.wiki.config.grep + if not exists(grep): + raise BackupError("grep command %s not found" % grep) + pipeline.append([grep, "-n", "<page>"]) + tail = self.wiki.config.tail + if not exists(tail): + raise BackupError("tail command %s not found" % tail) + pipeline.append([tail, "-1"]) + # without shell + proc = CommandPipeline(pipeline, quiet=True) + proc.run_pipeline_get_output() + if (proc.exited_successfully() or + (proc.get_failed_cmds_with_retcode() == + [[-signal.SIGPIPE, pipeline[0]]]) or + (proc.get_failed_cmds_with_retcode() == + [[signal.SIGPIPE + 128, pipeline[0]]])): + output = proc.output() + # 339915646: <page> + if ':' in output: + linecount = output.split(':')[0] + if linecount.isdigit(): + return linecount + return None + + def find_last_page_id(self, fileobj, runner): + count = self.get_lineno_last_page(fileobj, runner) + lastlines = self.get_last_lines_from_n(fileobj, runner, count) + # now look for the last page id in here. eww + if not lastlines: + return None + title_and_id_pattern = re.compile(r'<title>(?P<title>.+?)</title>\s*' + + r'(<ns>[0-9]+</ns>\s*)?' + + r'<id>(?P<pageid>\d+?)</id>') + result = None + for result in re.finditer(title_and_id_pattern, lastlines): + pass + if result is not None: + return result.group('pageid') + else: + return None + + def build_command(self, runner, stub_file): + """Build the command line for the dump, minus output and filter options""" + + # we write a temp file, it will be checkpointed every so often. + temp = bool(self._checkpoints_enabled) + + output_file = DumpFilename(self.wiki, stub_file.date, self.dumpname, + self.get_filetype(), self.file_ext, stub_file.partnum, + DumpFilename.make_checkpoint_string(stub_file.first_page_id, + stub_file.last_page_id), + temp) + + stub_path = os.path.join(self.wiki.config.temp_dir, stub_file.filename) + if os.path.exists(stub_path): + # if this is a partial stub file in temp dir, use that + stub_option = "--stub=gzip:%s" % stub_path + else: + # use regular stub file + stub_option = "--stub=gzip:%s" % runner.dump_dir.filename_public_path(stub_file) + + # Try to pull text from the previous run; most stuff hasn't changed + # Source=$OutputDir/pages_$section.xml.bz2 + sources = [] + possible_sources = None + if self._prefetch: + possible_sources = self._find_previous_dump(runner, output_file.partnum) + # if we have a list of more than one then + # we need to check existence for each and put them together in a string + if possible_sources: + for sourcefile in possible_sources: + # if we are doing partial stub run, include only the analogous + # checkpointed prefetch files, if there are checkpointed files + # otherwise we'll use the all the sourcefiles reported + if not self.chkptfile_in_pagerange(stub_file, sourcefile): + continue + sname = runner.dump_dir.filename_public_path(sourcefile, sourcefile.date) + if exists(sname): + sources.append(sname) + if output_file.partnum: + partnum_str = "%s" % stub_file.partnum + else: + partnum_str = "" + if len(sources) > 0: + source = "bzip2:%s" % (";".join(sources)) + runner.show_runner_state("... building %s %s XML dump, with text prefetch from %s..." % + (self._subset, partnum_str, source)) + prefetch = "--prefetch=%s" % (source) + else: + runner.show_runner_state("... building %s %s XML dump, no text prefetch..." % + (self._subset, partnum_str)) + prefetch = "" + + if self._spawn: + spawn = "--spawn=%s" % (self.wiki.config.php) + else: + spawn = "" + + if not exists(self.wiki.config.php): + raise BackupError("php command %s not found" % self.wiki.config.php) + + if self._checkpoints_enabled: + checkpoint_time = "--maxtime=%s" % (self.wiki.config.checkpoint_time) + checkpoint_file = "--checkpointfile=%s" % output_file.new_filename( + output_file.dumpname, output_file.file_type, output_file.file_ext, + output_file.date, output_file.partnum, "p%sp%s", None) + else: + checkpoint_time = "" + checkpoint_file = "" + script_command = MultiVersion.mw_script_as_array(runner.wiki.config, "dumpTextPass.php") + dump_command = [self.wiki.config.php] + dump_command.extend(script_command) + dump_command.extend(["--wiki=%s" % runner.db_name, + "%s" % stub_option, + "%s" % prefetch, + "%s" % checkpoint_time, + "%s" % checkpoint_file, + "--report=1000", + "%s" % spawn]) + + dump_command = [entry for entry in dump_command if entry is not None] + command = dump_command + filters = self.build_filters(runner, output_file) + eta = self.build_eta(runner) + command.extend([filters, eta]) + pipeline = [command] + series = [pipeline] + return series + + # taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net) + def sort_nicely(self, mylist): + """ Sort the given list in the way that humans expect. + """ + convert = lambda text: int(text) if text.isdigit() else text + alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] + mylist = sorted(mylist, key=alphanum_key) + + def get_relevant_prefetch_files(self, file_list, start_page_id, end_page_id, date, runner): + possibles = [] + if len(file_list): + # (a) nasty hack, see below (b) + maxparts = 0 + for file_obj in file_list: + if file_obj.is_file_part and file_obj.partnum_int > maxparts: + maxparts = file_obj.partnum_int + if not file_obj.first_page_id: + fname = DumpFile( + self.wiki, runner.dump_dir.filename_public_path(file_obj, date), + file_obj, self.verbose) + file_obj.first_page_id = fname.find_first_page_id_in_file() + + # get the files that cover our range + for file_obj in file_list: + # If some of the file_objs in file_list could not be properly be parsed, some of + # the (int) conversions below will fail. However, it is of little use to us, + # which conversion failed. /If any/ conversion fails, it means, that that we do + # not understand how to make sense of the current file_obj. Hence we cannot use + # it as prefetch object and we have to drop it, to avoid passing a useless file + # to the text pass. (This could days as of a comment below, but by not passing + # a likely useless file, we have to fetch more texts from the database) + # + # Therefore try...except-ing the whole block is sufficient: If whatever error + # occurs, we do not abort, but skip the file for prefetch. + try: + # If we could properly parse + first_page_id_in_file = int(file_obj.first_page_id) + + # fixme what do we do here? this could be very expensive. is that worth it?? + if not file_obj.last_page_id: + # (b) nasty hack, see (a) + # it's not a checkpoint fle or we'd have the pageid in the filename + # so... temporary hack which will give expensive results + # if file part, and it's the last one, put none + # if it's not the last part, get the first pageid in the next + # part and subtract 1 + # if not file part, put none. + if file_obj.is_file_part and file_obj.partnum_int < maxparts: + for fname in file_list: + if fname.partnum_int == file_obj.partnum_int + 1: + # not true! this could be a few past where it really is + # (because of deleted pages that aren't included at all) + file_obj.last_page_id = str(int(fname.first_page_id) - 1) + if file_obj.last_page_id: + last_page_id_in_file = int(file_obj.last_page_id) + else: + last_page_id_in_file = None + + # FIXME there is no point in including files that have just a + # few rev ids in them that we need, and having to read through + # the whole file... could take hours or days (later it won't matter, + # right? but until a rewrite, this is important) + # also be sure that if a critical page is deleted by the time we + # try to figure out ranges, that we don't get hosed + if ((first_page_id_in_file <= int(start_page_id) and + (last_page_id_in_file is None or + last_page_id_in_file >= int(start_page_id))) or + (first_page_id_in_file >= int(start_page_id) and + (end_page_id is None or + first_page_id_in_file <= int(end_page_id)))): + possibles.append(file_obj) + except Exception as ex: + runner.debug( + "Couldn't process %s for prefetch. Format update? Corrupt file?" + % file_obj.filename) + return possibles + + # this finds the content file or files from the first previous successful dump + # to be used as input ("prefetch") for this run. + def _find_previous_dump(self, runner, partnum=None): + """The previously-linked previous successful dump.""" + if partnum: + start_page_id = sum([self._parts[i] for i in range(0, int(partnum) - 1)]) + 1 + if len(self._parts) > int(partnum): + end_page_id = sum([self._parts[i] for i in range(0, int(partnum))]) + else: + end_page_id = None + else: + start_page_id = 1 + end_page_id = None + + if self._prefetchdate: + dumps = [self._prefetchdate] + else: + dumps = self.wiki.dump_dirs() + dumps = sorted(dumps, reverse=True) + for date in dumps: + if date == self.wiki.date: + runner.debug("skipping current dump for prefetch of job %s, date %s" % + (self.name(), self.wiki.date)) + continue + + # see if this job from that date was successful + if not runner.dumpjobdata.runinfo.status_of_old_dump_is_done( + runner, date, self.name(), self._desc): + runner.debug("skipping incomplete or failed dump for prefetch date %s" % date) + continue + + # first check if there are checkpoint files from this run we can use + files = self.list_checkpt_files( + runner.dump_dir, [self.dumpname], date, parts=None) + possible_prefetch_list = self.get_relevant_prefetch_files( + files, start_page_id, end_page_id, date, runner) + if len(possible_prefetch_list): + return possible_prefetch_list + + # ok, let's check for file parts instead, from any run + # (may not conform to our numbering for this job) + files = self.list_reg_files( + runner.dump_dir, [self.dumpname], date, parts=True) + possible_prefetch_list = self.get_relevant_prefetch_files( + files, start_page_id, end_page_id, date, runner) + if len(possible_prefetch_list): + return possible_prefetch_list + + # last shot, get output file that contains all the pages, if there is one + files = self.list_reg_files(runner.dump_dir, [self.dumpname], + date, parts=False) + # there is only one, don't bother to check for relevance :-P + possible_prefetch_list = files + files = [] + for prefetch in possible_prefetch_list: + possible = runner.dump_dir.filename_public_path(prefetch, date) + size = os.path.getsize(possible) + if size < 70000: + runner.debug("small %d-byte prefetch dump at %s, skipping" % (size, possible)) + continue + else: + files.append(prefetch) + if len(files): + return files + + runner.debug("Could not locate a prefetchable dump.") + return None + + def get_tmp_files(self, dump_dir, dump_names=None): + files = Dump.list_outfiles_for_cleanup(self, dump_dir, dump_names) + return [fileinfo for fileinfo in files if fileinfo.is_temp_file] + + def list_outfiles_for_cleanup(self, dump_dir, dump_names=None): + files = Dump.list_outfiles_for_cleanup(self, dump_dir, dump_names) + files_to_return = [] + + if self.page_id_range: + # this file is for one page range only + if ',' in self.page_id_range: + (first_page_id, last_page_id) = self.page_id_range.split(',', 2) + first_page_id = int(first_page_id) + last_page_id = int(last_page_id) + else: + first_page_id = int(self.page_id_range) + last_page_id = None + + # checkpoint files cover specific page ranges. for those, + # list only files within the given page range for cleanup + for fname in files: + if fname.is_checkpoint_file: + if (not first_page_id or + (fname.first_page_id and + (int(fname.first_page_id) >= first_page_id))): + if (not last_page_id or + (fname.last_page_id and + (int(fname.last_page_id) <= last_page_id))): + files_to_return.append(fname) + else: + files_to_return.append(fname) + else: + files_to_return = files + + return files_to_return + + +class BigXmlDump(XmlDump): + """XML page dump for something larger, where a 7-Zip compressed copy + could save 75% of download time for some users.""" + + def build_eta(self, runner): + """Tell the dumper script whether to make ETA estimate on page or revision count.""" + return "--full" diff --git a/xmldumps-backup/dumps/xmljobs.py b/xmldumps-backup/dumps/xmljobs.py index 342aa90..cc70181 100644 --- a/xmldumps-backup/dumps/xmljobs.py +++ b/xmldumps-backup/dumps/xmljobs.py @@ -1,18 +1,12 @@ ''' -All xml dump jobs are defined here +All xml dump jobs except content dump jobs are defined here ''' -import re -import os from os.path import exists -import signal -from dumps.CommandManagement import CommandPipeline from dumps.exceptions import BackupError -from dumps.fileutils import DumpFile, DumpFilename -from dumps.utils import MultiVersion, MiscUtils +from dumps.fileutils import DumpFilename from dumps.jobs import Dump -from dumps.WikiDump import Locker def batcher(items, batchsize): @@ -193,759 +187,6 @@ callback_stderr_arg=runner) if error: raise BackupError("error dumping log files") - - -class XmlDump(Dump): - """Primary XML dumps, one section at a time.""" - def __init__(self, subset, name, desc, detail, item_for_stubs, prefetch, - prefetchdate, spawn, - wiki, partnum_todo, parts=False, checkpoints=False, checkpoint_file=None, - page_id_range=None, verbose=False): - self._subset = subset - self._detail = detail - self._desc = desc - self._prefetch = prefetch - self._prefetchdate = prefetchdate - self._spawn = spawn - self._parts = parts - if self._parts: - self._parts_enabled = True - self.onlyparts = True - self._page_id = {} - self._partnum_todo = partnum_todo - - self.wiki = wiki - self.item_for_stubs = item_for_stubs - if checkpoints: - self._checkpoints_enabled = True - self.checkpoint_file = checkpoint_file - self.page_id_range = page_id_range - self.verbose = verbose - self._prerequisite_items = [self.item_for_stubs] - self._check_truncation = True - Dump.__init__(self, name, desc, self.verbose) - - def get_dumpname_base(self): - return 'pages-' - - def get_dumpname(self): - return self.get_dumpname_base() + self._subset - - def get_filetype(self): - return "xml" - - def get_file_ext(self): - return "bz2" - - def get_stub_files(self, runner, partnum=None): - ''' - get the stub files pertaining to our dumpname, which is *one* of - articles, pages-current, pages-history. - stubs include all of these together. - we will either return the one full stubs file that exists - or the one stub file part, if we are (re)running a specific - file part (subjob), or all file parts if we are (re)running - the entire job which is configured for subjobs. - - arguments: - runner - Runner object - partnum (int) - number of file part (subjob) if any - ''' - if partnum is None: - partnum = self._partnum_todo - if not self.dumpname.startswith(self.get_dumpname_base()): - raise BackupError("dumpname %s of unknown form for this job" % self.dumpname) - - dumpname = self.dumpname[len(self.get_dumpname_base()):] - stub_dumpnames = self.item_for_stubs.list_dumpnames() - for sname in stub_dumpnames: - if sname.endswith(dumpname): - stub_dumpname = sname - input_files = self.item_for_stubs.list_outfiles_for_input(runner.dump_dir, [stub_dumpname]) - if self._parts_enabled: - if partnum is not None: - for inp_file in input_files: - if inp_file.partnum_int == partnum: - input_files = [inp_file] - break - return input_files - - def get_chkptfile_from_pageids(self): - if ',' in self.page_id_range: - first_page_id, last_page_id = self.page_id_range.split(',', 1) - else: - first_page_id = self.page_id_range - last_page_id = "00000" # indicates no last page id specified, go to end of stub - checkpoint_string = DumpFilename.make_checkpoint_string(first_page_id, last_page_id) - if self._partnum_todo: - partnum = self._partnum_todo - else: - # fixme is that right? maybe NOT - partnum = None - fileobj = DumpFilename(self.get_dumpname(), self.wiki.date, self.get_filetype(), - self.get_file_ext(), partnum, checkpoint_string) - return fileobj.filename - - def get_missing_before(self, needed_range, have_range): - # given range of numbers needed and range of numbers we have, - # return range of numbers needed before first number we have, - # or None if none - if have_range is None: - return needed_range - elif needed_range is None or int(have_range[0]) <= int(needed_range[0]): - return None - else: - return (needed_range[0], str(int(have_range[0]) - 1), needed_range[2]) - - def find_missing_ranges(self, needed, have): - # given list tuples of ranges of numbers needed, and ranges of numbers we have, - # determine the ranges of numbers missing and return list of said tuples - needed_index = 0 - have_index = 0 - missing = [] - - if not needed: - return missing - if not have: - return needed - - needed_range = needed[needed_index] - have_range = have[have_index] - - while True: - # if we're out of haves, append everything we need - if have_range is None: - missing.append(needed_range) - needed_index += 1 - if needed_index < len(needed): - needed_range = needed[needed_index] - else: - # end of needed. done - return missing - - before_have = self.get_missing_before(needed_range, have_range) - - # write anything we don't have - if before_have is not None: - missing.append(before_have) - - # if we haven't already exhausted all the ranges we have... - if have_range is not None: - # skip over the current range of what we have - skip_up_to = str(int(have_range[1]) + 1) - while int(needed_range[1]) < int(skip_up_to): - needed_index += 1 - if needed_index < len(needed): - needed_range = needed[needed_index] - else: - # end of needed. done - return missing - - if int(needed_range[0]) < int(skip_up_to): - needed_range = (skip_up_to, needed_range[1], needed_range[2]) - - # get the next range we have - have_index += 1 - if have_index < len(have): - have_range = have[have_index] - else: - have_range = None - - return missing - - def chkpt_file_from_page_range(self, page_range, partnum): - checkpoint_string = DumpFilename.make_checkpoint_string( - page_range[0], page_range[1]) - output_file = DumpFilename(self.wiki, self.wiki.date, self.dumpname, - self.get_filetype(), self.get_file_ext(), - partnum, checkpoint=checkpoint_string, - temp=False) - return output_file - - def chkptfile_in_pagerange(self, fobj, chkpt_fobj): - """return False if both files are checkpoint files (with page ranges) - and the second file page range does not overlap with the first one""" - # not both checkpoint files: - if not fobj.is_checkpoint_file or not chkpt_fobj.is_checkpoint_file: - return True - # one or both end values are missing: - if not fobj.last_page_id and not chkpt_fobj.last_page_id: - return True - elif not fobj.last_page_id and int(chkpt_fobj.last_page_id) < int(fobj.first_page_id): - return True - elif not chkpt_fobj.last_page_id and int(fobj.last_page_id) < int(chkpt_fobj.first_page_id): - return True - # have end values for both files: - elif (int(fobj.first_page_id) <= int(chkpt_fobj.first_page_id) and - int(chkpt_fobj.first_page_id) <= int(fobj.last_page_id)): - return True - elif (int(chkpt_fobj.first_page_id) <= int(fobj.first_page_id) and - int(fobj.first_page_id) <= int(chkpt_fobj.last_page_id)): - return True - else: - return False - - def cleanup_tmp_files(self, dump_dir, runner): - """ - with checkpoint files turned on, this job writes output - to <something>.xml<-maybemorestuff>.bz2-tmp - and if those files are lying around after such a job dies, - we should clean them up - """ - if "cleanup_tmp_files" not in runner.enabled: - return - - # if we don't have the lock it's possible some - # other process is writing tmp files, don't touch - locker = Locker(self.wiki, self.wiki.date) - lockfiles = locker.is_locked() - if not lockfiles: - return - if len(lockfiles) > 1: - # more than one process with the lock? should not - # be possible, but if it is... touch nothing! - return - if not locker.check_owner(lockfiles[0], str(os.getpid())): - return - - to_delete = self.get_tmp_files(dump_dir) - for finfo in to_delete: - if exists(dump_dir.filename_public_path(finfo)): - os.remove(dump_dir.filename_public_path(finfo)) - elif exists(dump_dir.filename_private_path(finfo)): - os.remove(dump_dir.filename_private_path(finfo)) - - def run(self, runner): - # here we will either clean up or not depending on how we were called - # FIXME callers should set this appropriately and they don't right now - self.cleanup_old_files(runner.dump_dir, runner) - - # clean up all tmp output files from previous attempts of this job - # for this dump wiki and date, otherwise we'll wind up indexing - # them and hashsumming them etc. - # they may have been left around from an interrupted or failed earlier - # run - self.cleanup_tmp_files(runner.dump_dir, runner) - - commands = [] - - todo = [] - - if self.page_id_range is not None: - # convert to checkpoint filename, handle the same way - self.checkpoint_file = self.get_chkptfile_from_pageids() - - if self.checkpoint_file: - todo = [self.checkpoint_file] - else: - # list all the output files that would be produced w/o - # checkpoint files on - outfiles = self.get_reg_files_for_filepart_possible( - runner.dump_dir, self.get_fileparts_list(), self.list_dumpnames()) - if self._checkpoints_enabled: - # get the stub list that would be used for the current run - stubs = self.get_stub_files(runner) - stubs = sorted(stubs, key=lambda thing: thing.filename) - - # get the page ranges covered by stubs - stub_ranges = [] - for stub in stubs: - fname = DumpFile(self.wiki, - runner.dump_dir.filename_public_path(stub, stub.date), - stub, self.verbose) - stub_ranges.append((fname.find_first_page_id_in_file(), - self.find_last_page_id(stub, runner), stub.partnum)) - - # get list of existing checkpoint files - chkpt_files = self.list_checkpt_files( - runner.dump_dir, [self.dumpname], runner.wiki.date, parts=None) - chkpt_files = sorted(chkpt_files, key=lambda thing: thing.filename) - # get the page ranges covered by existing checkpoint files - checkpoint_ranges = [(chkptfile.first_page_id, chkptfile.last_page_id, - chkptfile.partnum) - for chkptfile in chkpt_files] - if self.verbose: - print "checkpoint_ranges is", checkpoint_ranges - print "stub_ranges is", stub_ranges - - if not checkpoint_ranges: - # no page ranges covered by checkpoints. do all output files - # the usual way - todo = outfiles - else: - todo = [] - missing_ranges = self.find_missing_ranges(stub_ranges, checkpoint_ranges) - parts = self.get_fileparts_list() - for partnum in parts: - if not [1 for chkpt_range in checkpoint_ranges - if int(chkpt_range[2]) == partnum]: - # entire page range for a particular file part (subjob) - # is missing so generate the regular output file - todo.extend([outfile for outfile in outfiles - if int(outfile.partnum) == partnum]) - else: - # at least some page ranges are covered, just do those that - # are missing (maybe none are and list is empty) - todo.extend([self.chkpt_file_from_page_range((first, last), part) - for (first, last, part) in missing_ranges - if int(part) == partnum]) - else: - # do the missing files only - todo = [outfile for outfile in outfiles - if not os.path.exists(runner.dump_dir.filename_public_path(outfile))] - - partial_stubs = [] - if self.verbose: - print "todo is", [to.filename for to in todo] - - for fileobj in todo: - - stub_for_file = self.get_stub_files(runner, fileobj.partnum_int)[0] - - if fileobj.first_page_id is None: - partial_stubs.append(stub_for_file) - else: - stub_output_file = DumpFilename( - self.wiki, fileobj.date, fileobj.dumpname, - self.item_for_stubs.get_filetype(), - self.item_for_stubs.get_file_ext(), - fileobj.partnum, - DumpFilename.make_checkpoint_string( - fileobj.first_page_id, fileobj.last_page_id), temp=True) - - self.write_partial_stub(stub_for_file, stub_output_file, runner) - if not self.has_no_entries(stub_output_file, runner): - partial_stubs.append(stub_output_file) - - if self.verbose: - print "partial_stubs is", [ps.filename for ps in partial_stubs] - if partial_stubs: - stub_files = partial_stubs - else: - return - - for stub_file in stub_files: - series = self.build_command(runner, stub_file) - commands.append(series) - - error = runner.run_command(commands, callback_stderr=self.progress_callback, - callback_stderr_arg=runner) - if error: - raise BackupError("error producing xml file(s) %s" % self.dumpname) - - def has_no_entries(self, xmlfile, runner): - ''' - see if it has a page id in it or not. no? then return True - ''' - if xmlfile.is_temp_file: - path = os.path.join(self.wiki.config.temp_dir, xmlfile.filename) - else: - path = runner.dump_dir.filename_public_path(xmlfile, self.wiki.date) - fname = DumpFile(self.wiki, path, xmlfile, self.verbose) - return bool(fname.find_first_page_id_in_file() is None) - - def build_eta(self, runner): - """Tell the dumper script whether to make ETA estimate on page or revision count.""" - return "--current" - - # takes name of the output file - def build_filters(self, runner, inp_file): - """Construct the output filter options for dumpTextPass.php""" - # do we need checkpoints? ummm - xmlbz2 = runner.dump_dir.filename_public_path(inp_file) - - if not exists(self.wiki.config.bzip2): - raise BackupError("bzip2 command %s not found" % self.wiki.config.bzip2) - if self.wiki.config.bzip2[-6:] == "dbzip2": - bz2mode = "dbzip2" - else: - bz2mode = "bzip2" - return "--output=%s:%s" % (bz2mode, xmlbz2) - - def write_partial_stub(self, input_file, output_file, runner): - if not exists(self.wiki.config.writeuptopageid): - raise BackupError("writeuptopageid command %s not found" % - self.wiki.config.writeuptopageid) - - inputfile_path = runner.dump_dir.filename_public_path(input_file) - output_file_path = os.path.join(self.wiki.config.temp_dir, output_file.filename) - if input_file.file_ext == "gz": - command1 = "%s -dc %s" % (self.wiki.config.gzip, inputfile_path) - command2 = "%s > %s" % (self.wiki.config.gzip, output_file_path) - elif input_file.file_ext == '7z': - command1 = "%s e -si %s" % (self.wiki.config.sevenzip, inputfile_path) - command2 = "%s e -so %s" % (self.wiki.config.sevenzip, output_file_path) - elif input_file.file_ext == 'bz': - command1 = "%s -dc %s" % (self.wiki.config.bzip2, inputfile_path) - command2 = "%s > %s" % (self.wiki.config.bzip2, output_file_path) - else: - raise BackupError("unknown stub file extension %s" % input_file.file_ext) - if output_file.last_page_id is not None and output_file.last_page_id is not "00000": - command = [command1 + ("| %s %s %s |" % (self.wiki.config.writeuptopageid, - output_file.first_page_id, - output_file.last_page_id)) + command2] - else: - # no lastpageid? read up to eof of the specific stub file that's used for input - command = [command1 + ("| %s %s |" % (self.wiki.config.writeuptopageid, - output_file.first_page_id)) + command2] - - pipeline = [command] - series = [pipeline] - error = runner.run_command([series], shell=True) - if error: - raise BackupError("failed to write partial stub file %s" % output_file.filename) - - def get_last_lines_from_n(self, fileobj, runner, count): - if not fileobj.filename or not exists(runner.dump_dir.filename_public_path(fileobj)): - return None - - dumpfile = DumpFile(self.wiki, - runner.dump_dir.filename_public_path(fileobj, self.wiki.date), - fileobj, self.verbose) - pipeline = dumpfile.setup_uncompression_command() - - tail = self.wiki.config.tail - if not exists(tail): - raise BackupError("tail command %s not found" % tail) - tail_esc = MiscUtils.shell_escape(tail) - pipeline.append([tail, "-n", "+%s" % count]) - # without shell - proc = CommandPipeline(pipeline, quiet=True) - proc.run_pipeline_get_output() - last_lines = "" - if (proc.exited_successfully() or - (proc.get_failed_cmds_with_retcode() == - [[-signal.SIGPIPE, pipeline[0]]]) or - (proc.get_failed_cmds_with_retcode() == - [[signal.SIGPIPE + 128, pipeline[0]]])): - last_lines = proc.output() - return last_lines - - def get_lineno_last_page(self, fileobj, runner): - if not fileobj.filename or not exists(runner.dump_dir.filename_public_path(fileobj)): - return None - dumpfile = DumpFile(self.wiki, - runner.dump_dir.filename_public_path(fileobj, self.wiki.date), - fileobj, self.verbose) - pipeline = dumpfile.setup_uncompression_command() - grep = self.wiki.config.grep - if not exists(grep): - raise BackupError("grep command %s not found" % grep) - pipeline.append([grep, "-n", "<page>"]) - tail = self.wiki.config.tail - if not exists(tail): - raise BackupError("tail command %s not found" % tail) - pipeline.append([tail, "-1"]) - # without shell - proc = CommandPipeline(pipeline, quiet=True) - proc.run_pipeline_get_output() - if (proc.exited_successfully() or - (proc.get_failed_cmds_with_retcode() == - [[-signal.SIGPIPE, pipeline[0]]]) or - (proc.get_failed_cmds_with_retcode() == - [[signal.SIGPIPE + 128, pipeline[0]]])): - output = proc.output() - # 339915646: <page> - if ':' in output: - linecount = output.split(':')[0] - if linecount.isdigit(): - return linecount - return None - - def find_last_page_id(self, fileobj, runner): - count = self.get_lineno_last_page(fileobj, runner) - lastlines = self.get_last_lines_from_n(fileobj, runner, count) - # now look for the last page id in here. eww - if not lastlines: - return None - title_and_id_pattern = re.compile(r'<title>(?P<title>.+?)</title>\s*' + - r'(<ns>[0-9]+</ns>\s*)?' + - r'<id>(?P<pageid>\d+?)</id>') - result = None - for result in re.finditer(title_and_id_pattern, lastlines): - pass - if result is not None: - return result.group('pageid') - else: - return None - - def build_command(self, runner, stub_file): - """Build the command line for the dump, minus output and filter options""" - - # we write a temp file, it will be checkpointed every so often. - temp = bool(self._checkpoints_enabled) - - output_file = DumpFilename(self.wiki, stub_file.date, self.dumpname, - self.get_filetype(), self.file_ext, stub_file.partnum, - DumpFilename.make_checkpoint_string(stub_file.first_page_id, - stub_file.last_page_id), - temp) - - stub_path = os.path.join(self.wiki.config.temp_dir, stub_file.filename) - if os.path.exists(stub_path): - # if this is a partial stub file in temp dir, use that - stub_option = "--stub=gzip:%s" % stub_path - else: - # use regular stub file - stub_option = "--stub=gzip:%s" % runner.dump_dir.filename_public_path(stub_file) - - # Try to pull text from the previous run; most stuff hasn't changed - # Source=$OutputDir/pages_$section.xml.bz2 - sources = [] - possible_sources = None - if self._prefetch: - possible_sources = self._find_previous_dump(runner, output_file.partnum) - # if we have a list of more than one then - # we need to check existence for each and put them together in a string - if possible_sources: - for sourcefile in possible_sources: - # if we are doing partial stub run, include only the analogous - # checkpointed prefetch files, if there are checkpointed files - # otherwise we'll use the all the sourcefiles reported - if not self.chkptfile_in_pagerange(stub_file, sourcefile): - continue - sname = runner.dump_dir.filename_public_path(sourcefile, sourcefile.date) - if exists(sname): - sources.append(sname) - if output_file.partnum: - partnum_str = "%s" % stub_file.partnum - else: - partnum_str = "" - if len(sources) > 0: - source = "bzip2:%s" % (";".join(sources)) - runner.show_runner_state("... building %s %s XML dump, with text prefetch from %s..." % - (self._subset, partnum_str, source)) - prefetch = "--prefetch=%s" % (source) - else: - runner.show_runner_state("... building %s %s XML dump, no text prefetch..." % - (self._subset, partnum_str)) - prefetch = "" - - if self._spawn: - spawn = "--spawn=%s" % (self.wiki.config.php) - else: - spawn = "" - - if not exists(self.wiki.config.php): - raise BackupError("php command %s not found" % self.wiki.config.php) - - if self._checkpoints_enabled: - checkpoint_time = "--maxtime=%s" % (self.wiki.config.checkpoint_time) - checkpoint_file = "--checkpointfile=%s" % output_file.new_filename( - output_file.dumpname, output_file.file_type, output_file.file_ext, - output_file.date, output_file.partnum, "p%sp%s", None) - else: - checkpoint_time = "" - checkpoint_file = "" - script_command = MultiVersion.mw_script_as_array(runner.wiki.config, "dumpTextPass.php") - dump_command = [self.wiki.config.php] - dump_command.extend(script_command) - dump_command.extend(["--wiki=%s" % runner.db_name, - "%s" % stub_option, - "%s" % prefetch, - "%s" % checkpoint_time, - "%s" % checkpoint_file, - "--report=1000", - "%s" % spawn]) - - dump_command = [entry for entry in dump_command if entry is not None] - command = dump_command - filters = self.build_filters(runner, output_file) - eta = self.build_eta(runner) - command.extend([filters, eta]) - pipeline = [command] - series = [pipeline] - return series - - # taken from a comment by user "Toothy" on Ned Batchelder's blog (no longer on the net) - def sort_nicely(self, mylist): - """ Sort the given list in the way that humans expect. - """ - convert = lambda text: int(text) if text.isdigit() else text - alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)] - mylist = sorted(mylist, key=alphanum_key) - - def get_relevant_prefetch_files(self, file_list, start_page_id, end_page_id, date, runner): - possibles = [] - if len(file_list): - # (a) nasty hack, see below (b) - maxparts = 0 - for file_obj in file_list: - if file_obj.is_file_part and file_obj.partnum_int > maxparts: - maxparts = file_obj.partnum_int - if not file_obj.first_page_id: - fname = DumpFile( - self.wiki, runner.dump_dir.filename_public_path(file_obj, date), - file_obj, self.verbose) - file_obj.first_page_id = fname.find_first_page_id_in_file() - - # get the files that cover our range - for file_obj in file_list: - # If some of the file_objs in file_list could not be properly be parsed, some of - # the (int) conversions below will fail. However, it is of little use to us, - # which conversion failed. /If any/ conversion fails, it means, that that we do - # not understand how to make sense of the current file_obj. Hence we cannot use - # it as prefetch object and we have to drop it, to avoid passing a useless file - # to the text pass. (This could days as of a comment below, but by not passing - # a likely useless file, we have to fetch more texts from the database) - # - # Therefore try...except-ing the whole block is sufficient: If whatever error - # occurs, we do not abort, but skip the file for prefetch. - try: - # If we could properly parse - first_page_id_in_file = int(file_obj.first_page_id) - - # fixme what do we do here? this could be very expensive. is that worth it?? - if not file_obj.last_page_id: - # (b) nasty hack, see (a) - # it's not a checkpoint fle or we'd have the pageid in the filename - # so... temporary hack which will give expensive results - # if file part, and it's the last one, put none - # if it's not the last part, get the first pageid in the next - # part and subtract 1 - # if not file part, put none. - if file_obj.is_file_part and file_obj.partnum_int < maxparts: - for fname in file_list: - if fname.partnum_int == file_obj.partnum_int + 1: - # not true! this could be a few past where it really is - # (because of deleted pages that aren't included at all) - file_obj.last_page_id = str(int(fname.first_page_id) - 1) - if file_obj.last_page_id: - last_page_id_in_file = int(file_obj.last_page_id) - else: - last_page_id_in_file = None - - # FIXME there is no point in including files that have just a - # few rev ids in them that we need, and having to read through - # the whole file... could take hours or days (later it won't matter, - # right? but until a rewrite, this is important) - # also be sure that if a critical page is deleted by the time we - # try to figure out ranges, that we don't get hosed - if ((first_page_id_in_file <= int(start_page_id) and - (last_page_id_in_file is None or - last_page_id_in_file >= int(start_page_id))) or - (first_page_id_in_file >= int(start_page_id) and - (end_page_id is None or - first_page_id_in_file <= int(end_page_id)))): - possibles.append(file_obj) - except Exception as ex: - runner.debug( - "Couldn't process %s for prefetch. Format update? Corrupt file?" - % file_obj.filename) - return possibles - - # this finds the content file or files from the first previous successful dump - # to be used as input ("prefetch") for this run. - def _find_previous_dump(self, runner, partnum=None): - """The previously-linked previous successful dump.""" - if partnum: - start_page_id = sum([self._parts[i] for i in range(0, int(partnum) - 1)]) + 1 - if len(self._parts) > int(partnum): - end_page_id = sum([self._parts[i] for i in range(0, int(partnum))]) - else: - end_page_id = None - else: - start_page_id = 1 - end_page_id = None - - if self._prefetchdate: - dumps = [self._prefetchdate] - else: - dumps = self.wiki.dump_dirs() - dumps = sorted(dumps, reverse=True) - for date in dumps: - if date == self.wiki.date: - runner.debug("skipping current dump for prefetch of job %s, date %s" % - (self.name(), self.wiki.date)) - continue - - # see if this job from that date was successful - if not runner.dumpjobdata.runinfo.status_of_old_dump_is_done( - runner, date, self.name(), self._desc): - runner.debug("skipping incomplete or failed dump for prefetch date %s" % date) - continue - - # first check if there are checkpoint files from this run we can use - files = self.list_checkpt_files( - runner.dump_dir, [self.dumpname], date, parts=None) - possible_prefetch_list = self.get_relevant_prefetch_files( - files, start_page_id, end_page_id, date, runner) - if len(possible_prefetch_list): - return possible_prefetch_list - - # ok, let's check for file parts instead, from any run - # (may not conform to our numbering for this job) - files = self.list_reg_files( - runner.dump_dir, [self.dumpname], date, parts=True) - possible_prefetch_list = self.get_relevant_prefetch_files( - files, start_page_id, end_page_id, date, runner) - if len(possible_prefetch_list): - return possible_prefetch_list - - # last shot, get output file that contains all the pages, if there is one - files = self.list_reg_files(runner.dump_dir, [self.dumpname], - date, parts=False) - # there is only one, don't bother to check for relevance :-P - possible_prefetch_list = files - files = [] - for prefetch in possible_prefetch_list: - possible = runner.dump_dir.filename_public_path(prefetch, date) - size = os.path.getsize(possible) - if size < 70000: - runner.debug("small %d-byte prefetch dump at %s, skipping" % (size, possible)) - continue - else: - files.append(prefetch) - if len(files): - return files - - runner.debug("Could not locate a prefetchable dump.") - return None - - def get_tmp_files(self, dump_dir, dump_names=None): - files = Dump.list_outfiles_for_cleanup(self, dump_dir, dump_names) - return [fileinfo for fileinfo in files if fileinfo.is_temp_file] - - def list_outfiles_for_cleanup(self, dump_dir, dump_names=None): - files = Dump.list_outfiles_for_cleanup(self, dump_dir, dump_names) - files_to_return = [] - - if self.page_id_range: - # this file is for one page range only - if ',' in self.page_id_range: - (first_page_id, last_page_id) = self.page_id_range.split(',', 2) - first_page_id = int(first_page_id) - last_page_id = int(last_page_id) - else: - first_page_id = int(self.page_id_range) - last_page_id = None - - # checkpoint files cover specific page ranges. for those, - # list only files within the given page range for cleanup - for fname in files: - if fname.is_checkpoint_file: - if (not first_page_id or - (fname.first_page_id and - (int(fname.first_page_id) >= first_page_id))): - if (not last_page_id or - (fname.last_page_id and - (int(fname.last_page_id) <= last_page_id))): - files_to_return.append(fname) - else: - files_to_return.append(fname) - else: - files_to_return = files - - return files_to_return - - -class BigXmlDump(XmlDump): - """XML page dump for something larger, where a 7-Zip compressed copy - could save 75% of download time for some users.""" - - def build_eta(self, runner): - """Tell the dumper script whether to make ETA estimate on page or revision count.""" - return "--full" class AbstractDump(Dump): -- To view, visit https://gerrit.wikimedia.org/r/343539 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: I12fce108339a4a5af48037827f69b0b2aadecd25 Gerrit-PatchSet: 3 Gerrit-Project: operations/dumps Gerrit-Branch: master Gerrit-Owner: ArielGlenn <ar...@wikimedia.org> Gerrit-Reviewer: ArielGlenn <ar...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits