ArielGlenn has submitted this change and it was merged. ( 
https://gerrit.wikimedia.org/r/394857 )

Change subject: ability to do xmlpageslogging several pieces at a time in 
parallel
......................................................................


ability to do xmlpageslogging several pieces at a time in parallel

Bug: T181935
Change-Id: Icef0aa23c363d7fa4d3b09074571f02a9ed2d3c6
---
M xmldumps-backup/defaults.conf
M xmldumps-backup/dumps/WikiDump.py
M xmldumps-backup/dumps/recombinejobs.py
M xmldumps-backup/dumps/runner.py
M xmldumps-backup/dumps/runnerutils.py
M xmldumps-backup/dumps/utils.py
M xmldumps-backup/dumps/xmljobs.py
M xmldumps-backup/xmlstreams.py
8 files changed, 157 insertions(+), 15 deletions(-)

Approvals:
  ArielGlenn: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/xmldumps-backup/defaults.conf b/xmldumps-backup/defaults.conf
index e95ac47..222cb2d 100644
--- a/xmldumps-backup/defaults.conf
+++ b/xmldumps-backup/defaults.conf
@@ -60,6 +60,8 @@
 pagesPerChunkHistory=0
 revsPerChunkHistory=0
 pagesPerChunkAbstract=0
+chunksForPagelogs=0
+logitemsPerPagelogs=0
 jobsperbatch=""
 revsPerJob=1000000
 maxRetries=0
diff --git a/xmldumps-backup/dumps/WikiDump.py 
b/xmldumps-backup/dumps/WikiDump.py
index 8ba3838..005f858 100644
--- a/xmldumps-backup/dumps/WikiDump.py
+++ b/xmldumps-backup/dumps/WikiDump.py
@@ -282,6 +282,10 @@
             "chunks", "chunksForAbstract", 0)
         self.pages_per_filepart_abstract = self.get_opt_for_proj_or_default(
             "chunks", "pagesPerChunkAbstract", 0)
+        self.numparts_for_pagelogs = self.get_opt_for_proj_or_default(
+            "chunks", "chunksForPagelogs", 0)
+        self.logitems_per_filepart_pagelogs = self.get_opt_for_proj_or_default(
+            "chunks", "logitemsPerPagelogs", 0)
         self.recombine_history = self.get_opt_for_proj_or_default(
             "chunks", "recombineHistory", 1)
         self.checkpoint_time = self.get_opt_for_proj_or_default(
diff --git a/xmldumps-backup/dumps/recombinejobs.py 
b/xmldumps-backup/dumps/recombinejobs.py
index 6d1f0a1..929f23a 100644
--- a/xmldumps-backup/dumps/recombinejobs.py
+++ b/xmldumps-backup/dumps/recombinejobs.py
@@ -329,3 +329,56 @@
                 error = result
         if error:
             raise BackupError("error recombining abstract dump files")
+
+
+class RecombineXmlLoggingDump(RecombineDump):
+    def __init__(self, name, desc, item_for_recombine):
+        # no partnum_todo, no parts generally (False, False), even though 
input may have it
+        self.item_for_recombine = item_for_recombine
+        self._prerequisite_items = [self.item_for_recombine]
+        super(RecombineXmlLoggingDump, self).__init__(name, desc)
+
+    def get_filetype(self):
+        return self.item_for_recombine.get_filetype()
+
+    def get_file_ext(self):
+        return self.item_for_recombine.get_file_ext()
+
+    def get_dumpname(self):
+        return self.item_for_recombine.get_dumpname()
+
+    def build_command(self, runner, to_recombine_dfnames, output_dfname):
+        input_dfnames = []
+        for in_dfname in to_recombine_dfnames:
+            if in_dfname.dumpname == output_dfname.dumpname:
+                input_dfnames.append(in_dfname)
+        if not len(input_dfnames):
+            self.set_status("failed")
+            raise BackupError("No input files for %s found" % self.name())
+        if not exists(runner.wiki.config.gzip):
+            raise BackupError("gzip command %s not found" % 
runner.wiki.config.gzip)
+        compression_command = "%s > " % runner.wiki.config.gzip
+        uncompression_command = ["%s" % runner.wiki.config.gzip, "-dc"]
+        recombine_command_string = self.build_recombine_command_string(
+            runner, input_dfnames, output_dfname, compression_command,
+            uncompression_command)
+        recombine_command = [recombine_command_string]
+        recombine_pipeline = [recombine_command]
+        series = [recombine_pipeline]
+        return series
+
+    def run(self, runner):
+        error = 0
+        to_recombine_dfnames = 
self.item_for_recombine.list_outfiles_for_input(runner.dump_dir)
+        output_dfnames = self.list_outfiles_for_build_command(runner.dump_dir)
+        for output_dfname in output_dfnames:
+            command_series = self.build_command(runner, to_recombine_dfnames, 
output_dfname)
+            self.setup_command_info(runner, command_series, [output_dfname])
+            result, broken = runner.run_command(
+                [command_series], callback_timed=self.progress_callback,
+                callback_timed_arg=runner, shell=True,
+                callback_on_completion=self.command_completion_callback)
+            if result:
+                error = result
+        if error:
+            raise BackupError("error recombining log event files")
diff --git a/xmldumps-backup/dumps/runner.py b/xmldumps-backup/dumps/runner.py
index 662f581..e0dfaca 100644
--- a/xmldumps-backup/dumps/runner.py
+++ b/xmldumps-backup/dumps/runner.py
@@ -13,7 +13,7 @@
 from dumps.apijobs import SiteInfoDump
 from dumps.tablesjobs import PrivateTable, PublicTable, TitleDump, AllTitleDump
 from dumps.recombinejobs import RecombineAbstractDump, RecombineXmlDump
-from dumps.recombinejobs import RecombineXmlStub, RecombineXmlRecompressDump
+from dumps.recombinejobs import RecombineXmlStub, RecombineXmlRecompressDump, 
RecombineXmlLoggingDump
 from dumps.xmljobs import XmlLogging, XmlStub, AbstractDump
 from dumps.xmlcontentjobs import XmlDump, BigXmlDump
 from dumps.recompressjobs import XmlMultiStreamDump, XmlRecompressDump
@@ -266,7 +266,14 @@
                 self.find_item_by_name('metacurrentdump')))
 
         self.dump_items.append(
-            XmlLogging("Log events to all pages and users."))
+            XmlLogging("Log events to all pages and users.",
+                       self._get_partnum_todo("xmlpagelogsdump"),
+                       get_int_setting(self.jobsperbatch, "xmlpagelogsdump"),
+                       self.filepart.get_logitems_per_filepart_pagelogs()))
+
+        self.append_job_if_needed(RecombineXmlLoggingDump(
+            "xmlpagelogsdumprecombine", "Recombine Log events to all pages and 
users",
+            self.find_item_by_name('xmlpagelogsdump')))
 
         self.append_job_if_needed(
             FlowDump("xmlflowdump", "content of flow pages in xml format"))
diff --git a/xmldumps-backup/dumps/runnerutils.py 
b/xmldumps-backup/dumps/runnerutils.py
index 10b0295..12a77a5 100644
--- a/xmldumps-backup/dumps/runnerutils.py
+++ b/xmldumps-backup/dumps/runnerutils.py
@@ -841,6 +841,7 @@
                 self.wiki.config.pages_per_filepart_history,
                 self.wiki.config.revs_per_filepart_history,
                 self.wiki.config.numparts_for_abstract,
+                self.wiki.config.numparts_for_pagelogs,
                 self.wiki.config.pages_per_filepart_abstract,
                 self.wiki.config.recombine_history,
                 self.wiki.config.checkpoint_time]
@@ -894,9 +895,10 @@
         self.wiki.config.pages_per_filepart_history = settings[1]
         self.wiki.config.revs_per_filepart_history = settings[2]
         self.wiki.config.numparts_for_abstract = settings[3]
-        self.wiki.config.pages_per_filepart_abstract = settings[4]
-        self.wiki.config.recombine_history = settings[5]
-        self.wiki.config.checkpoint_time = settings[6]
+        self.wiki.config.numparts_for_pagelogs = settings[4]
+        self.wiki.config.pages_per_filepart_abstract = settings[5]
+        self.wiki.config.recombine_history = settings[6]
+        self.wiki.config.checkpoint_time = settings[7]
 
 
 class DumpRunJobData(object):
diff --git a/xmldumps-backup/dumps/utils.py b/xmldumps-backup/dumps/utils.py
index 9c2f040..2bdc291 100644
--- a/xmldumps-backup/dumps/utils.py
+++ b/xmldumps-backup/dumps/utils.py
@@ -463,6 +463,7 @@
 class PageAndEditStats(object):
     def __init__(self, wiki, db_name, error_callback=None):
         self.total_pages = None
+        self.total_logitems = None
         self.total_edits = None
         self.wiki = wiki
         self.db_name = db_name
@@ -487,6 +488,7 @@
         lines = results.splitlines()
         if lines and lines[1]:
             self.total_pages = int(lines[1])
+
         query = "select MAX(rev_id) from %srevision;" % 
self.db_server_info.db_table_prefix
         retries = 0
         results = None
@@ -501,10 +503,29 @@
         lines = results.splitlines()
         if lines and lines[1]:
             self.total_edits = int(lines[1])
+
+        query = "select MAX(log_id) from %slogging;" % 
self.db_server_info.db_table_prefix
+        retries = 0
+        results = None
+        results = self.db_server_info.run_sql_and_get_output(query)
+        while results is None and retries < maxretries:
+            retries = retries + 1
+            time.sleep(5)
+            results = self.db_server_info.run_sql_and_get_output(query)
+        if not results:
+            return 1
+
+        lines = results.splitlines()
+        if lines and lines[1]:
+            self.total_logitems = int(lines[1])
+
         return 0
 
     def get_total_pages(self):
         return self.total_pages
+
+    def get_total_logitems(self):
+        return self.total_logitems
 
     def get_total_edits(self):
         return self.total_edits
@@ -548,6 +569,17 @@
                 self._pages_per_filepart_abstract = self.convert_comma_sep(
                     self.wiki.config.pages_per_filepart_abstract)
 
+            if (self.wiki.config.numparts_for_pagelogs and
+                    self.wiki.config.numparts_for_pagelogs != "0"):
+                # we add 200 padding to cover new log entries that may be added
+                logitems_per_filepart = 200 + self.stats.total_logitems / int(
+                    self.wiki.config.numparts_for_pagelogs)
+                self._logitems_per_filepart_pagelogs = [logitems_per_filepart 
for i in range(
+                    0, int(self.wiki.config.numparts_for_pagelogs))]
+            else:
+                self._logitems_per_filepart_pagelogs = self.convert_comma_sep(
+                    self.wiki.config.logitems_per_filepart_pagelogs)
+
             self._pages_per_filepart_history = self.convert_comma_sep(
                 self.wiki.config.pages_per_filepart_history)
             self._revs_per_filepart_history = self.convert_comma_sep(
@@ -557,6 +589,7 @@
             self._pages_per_filepart_history = False
             self._revs_per_filepart_history = False
             self._pages_per_filepart_abstract = False
+            self._logitems_per_filepart_pagelogs = False
             self._recombine_history = False
         if self._parts_enabled:
             if self._revs_per_filepart_history:
@@ -595,6 +628,18 @@
             else:
                 self._num_parts_abstract = 0
 
+            if self._logitems_per_filepart_pagelogs:
+                if (len(self._logitems_per_filepart_pagelogs) == 1 and
+                        self._logitems_per_filepart_pagelogs[0]):
+                    self._num_parts_pagelogs = 
self.get_num_parts_for_xml_dumps(
+                        self.stats.total_logitems, 
self._logitems_per_filepart_pagelogs[0])
+                    self._logitems_per_filepart_pagelogs = 
[self._logitems_per_filepart_pagelogs[0]
+                                                            for i in 
range(self._num_parts_pagelogs)]
+                else:
+                    self._num_parts_pagelogs = 
len(self._logitems_per_filepart_pagelogs)
+            else:
+                self._num_parts_pagelogs = 0
+
     def convert_comma_sep(self, line):
         if line == "":
             return False
@@ -608,6 +653,9 @@
     def get_pages_per_filepart_abstract(self):
         return self._pages_per_filepart_abstract
 
+    def get_logitems_per_filepart_pagelogs(self):
+        return self._logitems_per_filepart_pagelogs
+
     def get_num_parts_abstract(self):
         return self._num_parts_abstract
 
diff --git a/xmldumps-backup/dumps/xmljobs.py b/xmldumps-backup/dumps/xmljobs.py
index 4419d42..3a59c28 100644
--- a/xmldumps-backup/dumps/xmljobs.py
+++ b/xmldumps-backup/dumps/xmljobs.py
@@ -195,7 +195,13 @@
 class XmlLogging(Dump):
     """ Create a logging dump of all page activity """
 
-    def __init__(self, desc, parts=False):
+    def __init__(self, desc, partnum_todo, jobsperbatch=None, parts=False):
+        self._partnum_todo = partnum_todo
+        self.jobsperbatch = jobsperbatch
+        self._parts = parts
+        if self._parts:
+            self._parts_enabled = True
+            self.onlyparts = True
         Dump.__init__(self, "xmlpagelogsdump", desc)
 
     def detail(self):
@@ -229,6 +235,19 @@
                    config_file_arg, "--wiki", runner.db_name,
                    "--outfile", DumpFilename.get_inprogress_name(logging_path)]
 
+        if output_dfname.partnum:
+            # set up start end end pageids for this piece
+            # note there is no item id 0 I guess. so we start with 1
+            start = sum([self._parts[i] for i in range(0, 
output_dfname.partnum_int - 1)]) + 1
+            startopt = "--start=%s" % start
+            # if we are on the last file part, we should get up to the last 
log item id,
+            # whatever that is.
+            command.append(startopt)
+            if output_dfname.partnum_int < len(self._parts):
+                end = sum([self._parts[i] for i in range(0, 
output_dfname.partnum_int)]) + 1
+                endopt = "--end=%s" % end
+                command.append(endopt)
+
         pipeline = [command]
         series = [pipeline]
         return series
@@ -236,14 +255,19 @@
     def run(self, runner):
         self.cleanup_old_files(runner.dump_dir, runner)
         dfnames = self.list_outfiles_for_build_command(runner.dump_dir)
-        if len(dfnames) > 1:
-            raise BackupError("logging table job wants to produce more than 
one output file")
-        output_dfname = dfnames[0]
-        command_series = self.build_command(runner, output_dfname)
-        self.setup_command_info(runner, command_series, [output_dfname])
-        error, broken = runner.run_command([command_series], 
callback_stderr=self.progress_callback,
-                                           callback_stderr_arg=runner,
-                                           
callback_on_completion=self.command_completion_callback)
+        if self.jobsperbatch is not None:
+            maxjobs = self.jobsperbatch
+        else:
+            maxjobs = len(dfnames)
+        for batch in batcher(dfnames, maxjobs):
+            commands = []
+            for output_dfname in batch:
+                command_series = self.build_command(runner, output_dfname)
+                self.setup_command_info(runner, command_series, 
[output_dfname])
+                commands.append(command_series)
+            error, broken = runner.run_command(commands, 
callback_stderr=self.progress_callback,
+                                               callback_stderr_arg=runner,
+                                               
callback_on_completion=self.command_completion_callback)
         if error:
             raise BackupError("error dumping log files")
 
diff --git a/xmldumps-backup/xmlstreams.py b/xmldumps-backup/xmlstreams.py
index 6f4c1de..3163fba 100644
--- a/xmldumps-backup/xmlstreams.py
+++ b/xmldumps-backup/xmlstreams.py
@@ -42,7 +42,9 @@
     if interval is None:
         # hope this is not too awful a guess
         interval = (int(end) - int(start)) / 50
-        if interval > max_interval:
+        if interval == 0:
+            interval = 1
+        elif interval > max_interval:
             interval = max_interval
 
     interval_save = interval

-- 
To view, visit https://gerrit.wikimedia.org/r/394857
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Icef0aa23c363d7fa4d3b09074571f02a9ed2d3c6
Gerrit-PatchSet: 11
Gerrit-Project: operations/dumps
Gerrit-Branch: master
Gerrit-Owner: ArielGlenn <ar...@wikimedia.org>
Gerrit-Reviewer: ArielGlenn <ar...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to