ArielGlenn has submitted this change and it was merged. ( https://gerrit.wikimedia.org/r/394857 )
Change subject: ability to do xmlpageslogging several pieces at a time in parallel ...................................................................... ability to do xmlpageslogging several pieces at a time in parallel Bug: T181935 Change-Id: Icef0aa23c363d7fa4d3b09074571f02a9ed2d3c6 --- M xmldumps-backup/defaults.conf M xmldumps-backup/dumps/WikiDump.py M xmldumps-backup/dumps/recombinejobs.py M xmldumps-backup/dumps/runner.py M xmldumps-backup/dumps/runnerutils.py M xmldumps-backup/dumps/utils.py M xmldumps-backup/dumps/xmljobs.py M xmldumps-backup/xmlstreams.py 8 files changed, 157 insertions(+), 15 deletions(-) Approvals: ArielGlenn: Looks good to me, approved jenkins-bot: Verified diff --git a/xmldumps-backup/defaults.conf b/xmldumps-backup/defaults.conf index e95ac47..222cb2d 100644 --- a/xmldumps-backup/defaults.conf +++ b/xmldumps-backup/defaults.conf @@ -60,6 +60,8 @@ pagesPerChunkHistory=0 revsPerChunkHistory=0 pagesPerChunkAbstract=0 +chunksForPagelogs=0 +logitemsPerPagelogs=0 jobsperbatch="" revsPerJob=1000000 maxRetries=0 diff --git a/xmldumps-backup/dumps/WikiDump.py b/xmldumps-backup/dumps/WikiDump.py index 8ba3838..005f858 100644 --- a/xmldumps-backup/dumps/WikiDump.py +++ b/xmldumps-backup/dumps/WikiDump.py @@ -282,6 +282,10 @@ "chunks", "chunksForAbstract", 0) self.pages_per_filepart_abstract = self.get_opt_for_proj_or_default( "chunks", "pagesPerChunkAbstract", 0) + self.numparts_for_pagelogs = self.get_opt_for_proj_or_default( + "chunks", "chunksForPagelogs", 0) + self.logitems_per_filepart_pagelogs = self.get_opt_for_proj_or_default( + "chunks", "logitemsPerPagelogs", 0) self.recombine_history = self.get_opt_for_proj_or_default( "chunks", "recombineHistory", 1) self.checkpoint_time = self.get_opt_for_proj_or_default( diff --git a/xmldumps-backup/dumps/recombinejobs.py b/xmldumps-backup/dumps/recombinejobs.py index 6d1f0a1..929f23a 100644 --- a/xmldumps-backup/dumps/recombinejobs.py +++ b/xmldumps-backup/dumps/recombinejobs.py @@ -329,3 +329,56 @@ error = result if error: raise BackupError("error recombining abstract dump files") + + +class RecombineXmlLoggingDump(RecombineDump): + def __init__(self, name, desc, item_for_recombine): + # no partnum_todo, no parts generally (False, False), even though input may have it + self.item_for_recombine = item_for_recombine + self._prerequisite_items = [self.item_for_recombine] + super(RecombineXmlLoggingDump, self).__init__(name, desc) + + def get_filetype(self): + return self.item_for_recombine.get_filetype() + + def get_file_ext(self): + return self.item_for_recombine.get_file_ext() + + def get_dumpname(self): + return self.item_for_recombine.get_dumpname() + + def build_command(self, runner, to_recombine_dfnames, output_dfname): + input_dfnames = [] + for in_dfname in to_recombine_dfnames: + if in_dfname.dumpname == output_dfname.dumpname: + input_dfnames.append(in_dfname) + if not len(input_dfnames): + self.set_status("failed") + raise BackupError("No input files for %s found" % self.name()) + if not exists(runner.wiki.config.gzip): + raise BackupError("gzip command %s not found" % runner.wiki.config.gzip) + compression_command = "%s > " % runner.wiki.config.gzip + uncompression_command = ["%s" % runner.wiki.config.gzip, "-dc"] + recombine_command_string = self.build_recombine_command_string( + runner, input_dfnames, output_dfname, compression_command, + uncompression_command) + recombine_command = [recombine_command_string] + recombine_pipeline = [recombine_command] + series = [recombine_pipeline] + return series + + def run(self, runner): + error = 0 + to_recombine_dfnames = self.item_for_recombine.list_outfiles_for_input(runner.dump_dir) + output_dfnames = self.list_outfiles_for_build_command(runner.dump_dir) + for output_dfname in output_dfnames: + command_series = self.build_command(runner, to_recombine_dfnames, output_dfname) + self.setup_command_info(runner, command_series, [output_dfname]) + result, broken = runner.run_command( + [command_series], callback_timed=self.progress_callback, + callback_timed_arg=runner, shell=True, + callback_on_completion=self.command_completion_callback) + if result: + error = result + if error: + raise BackupError("error recombining log event files") diff --git a/xmldumps-backup/dumps/runner.py b/xmldumps-backup/dumps/runner.py index 662f581..e0dfaca 100644 --- a/xmldumps-backup/dumps/runner.py +++ b/xmldumps-backup/dumps/runner.py @@ -13,7 +13,7 @@ from dumps.apijobs import SiteInfoDump from dumps.tablesjobs import PrivateTable, PublicTable, TitleDump, AllTitleDump from dumps.recombinejobs import RecombineAbstractDump, RecombineXmlDump -from dumps.recombinejobs import RecombineXmlStub, RecombineXmlRecompressDump +from dumps.recombinejobs import RecombineXmlStub, RecombineXmlRecompressDump, RecombineXmlLoggingDump from dumps.xmljobs import XmlLogging, XmlStub, AbstractDump from dumps.xmlcontentjobs import XmlDump, BigXmlDump from dumps.recompressjobs import XmlMultiStreamDump, XmlRecompressDump @@ -266,7 +266,14 @@ self.find_item_by_name('metacurrentdump'))) self.dump_items.append( - XmlLogging("Log events to all pages and users.")) + XmlLogging("Log events to all pages and users.", + self._get_partnum_todo("xmlpagelogsdump"), + get_int_setting(self.jobsperbatch, "xmlpagelogsdump"), + self.filepart.get_logitems_per_filepart_pagelogs())) + + self.append_job_if_needed(RecombineXmlLoggingDump( + "xmlpagelogsdumprecombine", "Recombine Log events to all pages and users", + self.find_item_by_name('xmlpagelogsdump'))) self.append_job_if_needed( FlowDump("xmlflowdump", "content of flow pages in xml format")) diff --git a/xmldumps-backup/dumps/runnerutils.py b/xmldumps-backup/dumps/runnerutils.py index 10b0295..12a77a5 100644 --- a/xmldumps-backup/dumps/runnerutils.py +++ b/xmldumps-backup/dumps/runnerutils.py @@ -841,6 +841,7 @@ self.wiki.config.pages_per_filepart_history, self.wiki.config.revs_per_filepart_history, self.wiki.config.numparts_for_abstract, + self.wiki.config.numparts_for_pagelogs, self.wiki.config.pages_per_filepart_abstract, self.wiki.config.recombine_history, self.wiki.config.checkpoint_time] @@ -894,9 +895,10 @@ self.wiki.config.pages_per_filepart_history = settings[1] self.wiki.config.revs_per_filepart_history = settings[2] self.wiki.config.numparts_for_abstract = settings[3] - self.wiki.config.pages_per_filepart_abstract = settings[4] - self.wiki.config.recombine_history = settings[5] - self.wiki.config.checkpoint_time = settings[6] + self.wiki.config.numparts_for_pagelogs = settings[4] + self.wiki.config.pages_per_filepart_abstract = settings[5] + self.wiki.config.recombine_history = settings[6] + self.wiki.config.checkpoint_time = settings[7] class DumpRunJobData(object): diff --git a/xmldumps-backup/dumps/utils.py b/xmldumps-backup/dumps/utils.py index 9c2f040..2bdc291 100644 --- a/xmldumps-backup/dumps/utils.py +++ b/xmldumps-backup/dumps/utils.py @@ -463,6 +463,7 @@ class PageAndEditStats(object): def __init__(self, wiki, db_name, error_callback=None): self.total_pages = None + self.total_logitems = None self.total_edits = None self.wiki = wiki self.db_name = db_name @@ -487,6 +488,7 @@ lines = results.splitlines() if lines and lines[1]: self.total_pages = int(lines[1]) + query = "select MAX(rev_id) from %srevision;" % self.db_server_info.db_table_prefix retries = 0 results = None @@ -501,10 +503,29 @@ lines = results.splitlines() if lines and lines[1]: self.total_edits = int(lines[1]) + + query = "select MAX(log_id) from %slogging;" % self.db_server_info.db_table_prefix + retries = 0 + results = None + results = self.db_server_info.run_sql_and_get_output(query) + while results is None and retries < maxretries: + retries = retries + 1 + time.sleep(5) + results = self.db_server_info.run_sql_and_get_output(query) + if not results: + return 1 + + lines = results.splitlines() + if lines and lines[1]: + self.total_logitems = int(lines[1]) + return 0 def get_total_pages(self): return self.total_pages + + def get_total_logitems(self): + return self.total_logitems def get_total_edits(self): return self.total_edits @@ -548,6 +569,17 @@ self._pages_per_filepart_abstract = self.convert_comma_sep( self.wiki.config.pages_per_filepart_abstract) + if (self.wiki.config.numparts_for_pagelogs and + self.wiki.config.numparts_for_pagelogs != "0"): + # we add 200 padding to cover new log entries that may be added + logitems_per_filepart = 200 + self.stats.total_logitems / int( + self.wiki.config.numparts_for_pagelogs) + self._logitems_per_filepart_pagelogs = [logitems_per_filepart for i in range( + 0, int(self.wiki.config.numparts_for_pagelogs))] + else: + self._logitems_per_filepart_pagelogs = self.convert_comma_sep( + self.wiki.config.logitems_per_filepart_pagelogs) + self._pages_per_filepart_history = self.convert_comma_sep( self.wiki.config.pages_per_filepart_history) self._revs_per_filepart_history = self.convert_comma_sep( @@ -557,6 +589,7 @@ self._pages_per_filepart_history = False self._revs_per_filepart_history = False self._pages_per_filepart_abstract = False + self._logitems_per_filepart_pagelogs = False self._recombine_history = False if self._parts_enabled: if self._revs_per_filepart_history: @@ -595,6 +628,18 @@ else: self._num_parts_abstract = 0 + if self._logitems_per_filepart_pagelogs: + if (len(self._logitems_per_filepart_pagelogs) == 1 and + self._logitems_per_filepart_pagelogs[0]): + self._num_parts_pagelogs = self.get_num_parts_for_xml_dumps( + self.stats.total_logitems, self._logitems_per_filepart_pagelogs[0]) + self._logitems_per_filepart_pagelogs = [self._logitems_per_filepart_pagelogs[0] + for i in range(self._num_parts_pagelogs)] + else: + self._num_parts_pagelogs = len(self._logitems_per_filepart_pagelogs) + else: + self._num_parts_pagelogs = 0 + def convert_comma_sep(self, line): if line == "": return False @@ -608,6 +653,9 @@ def get_pages_per_filepart_abstract(self): return self._pages_per_filepart_abstract + def get_logitems_per_filepart_pagelogs(self): + return self._logitems_per_filepart_pagelogs + def get_num_parts_abstract(self): return self._num_parts_abstract diff --git a/xmldumps-backup/dumps/xmljobs.py b/xmldumps-backup/dumps/xmljobs.py index 4419d42..3a59c28 100644 --- a/xmldumps-backup/dumps/xmljobs.py +++ b/xmldumps-backup/dumps/xmljobs.py @@ -195,7 +195,13 @@ class XmlLogging(Dump): """ Create a logging dump of all page activity """ - def __init__(self, desc, parts=False): + def __init__(self, desc, partnum_todo, jobsperbatch=None, parts=False): + self._partnum_todo = partnum_todo + self.jobsperbatch = jobsperbatch + self._parts = parts + if self._parts: + self._parts_enabled = True + self.onlyparts = True Dump.__init__(self, "xmlpagelogsdump", desc) def detail(self): @@ -229,6 +235,19 @@ config_file_arg, "--wiki", runner.db_name, "--outfile", DumpFilename.get_inprogress_name(logging_path)] + if output_dfname.partnum: + # set up start end end pageids for this piece + # note there is no item id 0 I guess. so we start with 1 + start = sum([self._parts[i] for i in range(0, output_dfname.partnum_int - 1)]) + 1 + startopt = "--start=%s" % start + # if we are on the last file part, we should get up to the last log item id, + # whatever that is. + command.append(startopt) + if output_dfname.partnum_int < len(self._parts): + end = sum([self._parts[i] for i in range(0, output_dfname.partnum_int)]) + 1 + endopt = "--end=%s" % end + command.append(endopt) + pipeline = [command] series = [pipeline] return series @@ -236,14 +255,19 @@ def run(self, runner): self.cleanup_old_files(runner.dump_dir, runner) dfnames = self.list_outfiles_for_build_command(runner.dump_dir) - if len(dfnames) > 1: - raise BackupError("logging table job wants to produce more than one output file") - output_dfname = dfnames[0] - command_series = self.build_command(runner, output_dfname) - self.setup_command_info(runner, command_series, [output_dfname]) - error, broken = runner.run_command([command_series], callback_stderr=self.progress_callback, - callback_stderr_arg=runner, - callback_on_completion=self.command_completion_callback) + if self.jobsperbatch is not None: + maxjobs = self.jobsperbatch + else: + maxjobs = len(dfnames) + for batch in batcher(dfnames, maxjobs): + commands = [] + for output_dfname in batch: + command_series = self.build_command(runner, output_dfname) + self.setup_command_info(runner, command_series, [output_dfname]) + commands.append(command_series) + error, broken = runner.run_command(commands, callback_stderr=self.progress_callback, + callback_stderr_arg=runner, + callback_on_completion=self.command_completion_callback) if error: raise BackupError("error dumping log files") diff --git a/xmldumps-backup/xmlstreams.py b/xmldumps-backup/xmlstreams.py index 6f4c1de..3163fba 100644 --- a/xmldumps-backup/xmlstreams.py +++ b/xmldumps-backup/xmlstreams.py @@ -42,7 +42,9 @@ if interval is None: # hope this is not too awful a guess interval = (int(end) - int(start)) / 50 - if interval > max_interval: + if interval == 0: + interval = 1 + elif interval > max_interval: interval = max_interval interval_save = interval -- To view, visit https://gerrit.wikimedia.org/r/394857 To unsubscribe, visit https://gerrit.wikimedia.org/r/settings Gerrit-MessageType: merged Gerrit-Change-Id: Icef0aa23c363d7fa4d3b09074571f02a9ed2d3c6 Gerrit-PatchSet: 11 Gerrit-Project: operations/dumps Gerrit-Branch: master Gerrit-Owner: ArielGlenn <ar...@wikimedia.org> Gerrit-Reviewer: ArielGlenn <ar...@wikimedia.org> Gerrit-Reviewer: jenkins-bot <> _______________________________________________ MediaWiki-commits mailing list MediaWiki-commits@lists.wikimedia.org https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits